From 8ce76120fea2d89652ddd17ab8f580e16af52dc1 Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Tue, 2 Feb 2021 09:57:56 +0000 Subject: [PATCH] Revert coroutine changes for core & rpc_test (#3081) * Revert coroutine changes for core & rpc_test * Fix merge --- CMakeLists.txt | 13 ++----- ci/cmake-format-all.sh | 0 nano/load_test/CMakeLists.txt | 3 +- nano/node/CMakeLists.txt | 2 - nano/node/network.cpp | 49 +++++++++---------------- nano/node/network.hpp | 3 -- nano/node/node.hpp | 7 ---- nano/rpc_test/CMakeLists.txt | 9 +---- nano/rpc_test/rpc.cpp | 69 +++++++++++++++++++++-------------- 9 files changed, 65 insertions(+), 90 deletions(-) mode change 100644 => 100755 ci/cmake-format-all.sh diff --git a/CMakeLists.txt b/CMakeLists.txt index 57cfded2a..4f847563e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -347,16 +347,8 @@ set(Boost_USE_MULTITHREADED ON) list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/Modules") -find_package( - Boost 1.69.0 REQUIRED - COMPONENTS coroutine - context - filesystem - log - log_setup - thread - program_options - system) +find_package(Boost 1.69.0 REQUIRED COMPONENTS filesystem log log_setup thread + program_options system) # RocksDB include_directories(rocksdb/include) @@ -549,6 +541,7 @@ if(NANO_FUZZER_TEST) endif() if(NANO_TEST OR RAIBLOCKS_TEST) + find_package(Boost 1.69.0 REQUIRED COMPONENTS coroutine context) if(WIN32) if(MSVC_VERSION) if(MSVC_VERSION GREATER_EQUAL 1910) diff --git a/ci/cmake-format-all.sh b/ci/cmake-format-all.sh old mode 100644 new mode 100755 diff --git a/nano/load_test/CMakeLists.txt b/nano/load_test/CMakeLists.txt index 0171f3e58..921e5122e 100644 --- a/nano/load_test/CMakeLists.txt +++ b/nano/load_test/CMakeLists.txt @@ -7,4 +7,5 @@ target_link_libraries( test_common gtest Boost::boost - Boost::coroutine) + Boost::coroutine + Boost::context) diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 38c46b623..e40b7367c 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -152,8 +152,6 @@ target_link_libraries( libminiupnpc-static argon2 lmdb - Boost::coroutine - Boost::context Boost::filesystem Boost::log_setup Boost::log diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 4c3f24c3c..65588fb13 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -22,10 +22,7 @@ publish_filter (256 * 1024), udp_channels (node_a, port_a), tcp_channels (node_a), port (port_a), -disconnect_observer ([]() {}), -cleanup_timer{ node_a.io_ctx }, -cookie_timer{ node_a.io_ctx }, -keepalive_timer{ node_a.io_ctx } +disconnect_observer ([]() {}) { boost::thread::attributes attrs; nano::thread_attributes::set (attrs); @@ -136,9 +133,6 @@ void nano::network::stop () { thread.join (); } - cleanup_timer.cancel (); - cookie_timer.cancel (); - keepalive_timer.cancel (); } } @@ -737,47 +731,38 @@ void nano::network::cleanup (std::chrono::steady_clock::time_point const & cutof void nano::network::ongoing_cleanup () { - node.spawn ( - [this](boost::asio::yield_context yield) { - boost::system::error_code ec; - while (!stopped && !ec) + cleanup (std::chrono::steady_clock::now () - node.network_params.node.cutoff); + std::weak_ptr node_w (node.shared ()); + node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.node.period, [node_w]() { + if (auto node_l = node_w.lock ()) { - cleanup (std::chrono::steady_clock::now () - node.network_params.node.cutoff); - cleanup_timer.expires_from_now (node.network_params.node.period); - cleanup_timer.async_wait (yield[ec]); + node_l->network.ongoing_cleanup (); } - debug_assert (stopped || ec == boost::asio::error::operation_aborted); }); } void nano::network::ongoing_syn_cookie_cleanup () { - node.spawn ( - [this](boost::asio::yield_context yield) { - boost::system::error_code ec; - while (!stopped && !ec) + syn_cookies.purge (std::chrono::steady_clock::now () - nano::transport::syn_cookie_cutoff); + std::weak_ptr node_w (node.shared ()); + node.workers.add_timed_task (std::chrono::steady_clock::now () + (nano::transport::syn_cookie_cutoff * 2), [node_w]() { + if (auto node_l = node_w.lock ()) { - this->syn_cookies.purge (std::chrono::steady_clock::now () - nano::transport::syn_cookie_cutoff); - cookie_timer.expires_from_now (nano::transport::syn_cookie_cutoff * 2); - cookie_timer.async_wait (yield[ec]); + node_l->network.ongoing_syn_cookie_cleanup (); } - debug_assert (stopped || ec == boost::asio::error::operation_aborted); }); } void nano::network::ongoing_keepalive () { - node.spawn ( - [this](boost::asio::yield_context yield) { - boost::system::error_code ec; - while (!stopped && !ec) + flood_keepalive (0.75f); + flood_keepalive_self (0.25f); + std::weak_ptr node_w (node.shared ()); + node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.node.half_period, [node_w]() { + if (auto node_l = node_w.lock ()) { - flood_keepalive (0.75f); - flood_keepalive_self (0.25f); - keepalive_timer.expires_from_now (node.network_params.node.half_period); - keepalive_timer.async_wait (yield[ec]); + node_l->network.ongoing_keepalive (); } - debug_assert (stopped || ec == boost::asio::error::operation_aborted); }); } diff --git a/nano/node/network.hpp b/nano/node/network.hpp index e492d94e6..1f9988f33 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -195,9 +195,6 @@ public: // Called when a new channel is observed std::function)> channel_observer; std::atomic stopped{ false }; - boost::asio::steady_timer cleanup_timer; - boost::asio::steady_timer cookie_timer; - boost::asio::steady_timer keepalive_timer; static unsigned const broadcast_interval_ms = 10; static size_t const buffer_size = 512; static size_t const confirm_req_hashes_max = 7; diff --git a/nano/node/node.hpp b/nano/node/node.hpp index d18209f08..c374e3dbe 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -1,6 +1,5 @@ #pragma once -#include #include #include #include @@ -94,12 +93,6 @@ public: { io_ctx.post (action_a); } - template - void spawn (Params... args) - { - boost::coroutines::attributes attributes{ boost::coroutines::stack_allocator::traits_type::default_size () * (is_sanitizer_build ? 2 : 1) }; - boost::asio::spawn (io_ctx, std::forward (args)..., attributes); - } bool copy_with_compaction (boost::filesystem::path const &); void keepalive (std::string const &, uint16_t); void start (); diff --git a/nano/rpc_test/CMakeLists.txt b/nano/rpc_test/CMakeLists.txt index 16cf63f83..84bd77f87 100644 --- a/nano/rpc_test/CMakeLists.txt +++ b/nano/rpc_test/CMakeLists.txt @@ -1,13 +1,6 @@ add_executable(rpc_test entry.cpp rpc.cpp) -target_link_libraries( - rpc_test - node - secure - rpc - test_common - gtest - Boost::coroutine) +target_link_libraries(rpc_test node secure rpc test_common gtest) target_compile_definitions( rpc_test diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 1fcfa8532..fab3ff124 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -1,4 +1,3 @@ -#include #include #include #include @@ -27,26 +26,23 @@ class test_response { public: test_response (boost::property_tree::ptree const & request_a, boost::asio::io_context & io_ctx_a) : - request (request_a) + request (request_a), + sock (io_ctx_a) { } test_response (boost::property_tree::ptree const & request_a, uint16_t port_a, boost::asio::io_context & io_ctx_a) : - request (request_a) + request (request_a), + sock (io_ctx_a) { - run (port_a, io_ctx_a); + run (port_a); } - void run (uint16_t port_a, boost::asio::io_context & io_ctx_a) + void run (uint16_t port_a) { - boost::asio::spawn (io_ctx_a, [this, &io_ctx_a, port_a](boost::asio::yield_context yield) { - boost::asio::ip::tcp::socket sock (io_ctx_a); - boost::beast::flat_buffer sb; - boost::beast::http::request req; - - try + sock.async_connect (nano::tcp_endpoint (boost::asio::ip::address_v6::loopback (), port_a), [this](boost::system::error_code const & ec) { + if (!ec) { - sock.async_connect (nano::tcp_endpoint (boost::asio::ip::address_v6::loopback (), port_a), yield); std::stringstream ostream; boost::property_tree::write_json (ostream, request); req.method (boost::beast::http::verb::post); @@ -55,27 +51,46 @@ public: ostream.flush (); req.body () = ostream.str (); req.prepare_payload (); - boost::beast::http::async_write (sock, req, yield); - boost::beast::http::async_read (sock, sb, resp, yield); - std::stringstream body (resp.body ()); - try - { - boost::property_tree::read_json (body, json); - status = 200; - } - catch (std::exception const &) - { - status = 400; - } + boost::beast::http::async_write (sock, req, [this](boost::system::error_code const & ec, size_t bytes_transferred) { + if (!ec) + { + boost::beast::http::async_read (sock, sb, resp, [this](boost::system::error_code const & ec, size_t bytes_transferred) { + if (!ec) + { + std::stringstream body (resp.body ()); + try + { + boost::property_tree::read_json (body, json); + status = 200; + } + catch (std::exception &) + { + status = 500; + } + } + else + { + status = 400; + } + }); + } + else + { + status = 600; + } + }); } - catch (boost::system::error_code const &) + else { status = 400; } }); } boost::property_tree::ptree const & request; + boost::asio::ip::tcp::socket sock; boost::property_tree::ptree json; + boost::beast::flat_buffer sb; + boost::beast::http::request req; boost::beast::http::response resp; std::atomic status{ 0 }; }; @@ -7259,8 +7274,8 @@ TEST (rpc, simultaneous_calls) std::atomic count{ num }; for (int i = 0; i < num; ++i) { - std::thread ([&test_responses, &promise, &count, i, port = rpc.config.port, &io_ctx = system.io_ctx]() { - test_responses[i]->run (port, io_ctx); + std::thread ([&test_responses, &promise, &count, i, port = rpc.config.port]() { + test_responses[i]->run (port); if (--count == 0) { promise.set_value ();