diff --git a/CMakeLists.txt b/CMakeLists.txt index 3e015339..5392db4e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -426,6 +426,8 @@ if (NANO_FUZZER_TEST) endif () if (NANO_TEST OR RAIBLOCKS_TEST) + find_package (Boost 1.69.0 REQUIRED COMPONENTS coroutine) + if(WIN32) if(MSVC_VERSION) if(MSVC_VERSION GREATER_EQUAL 1910) diff --git a/nano/boost/asio/spawn.hpp b/nano/boost/asio/spawn.hpp new file mode 100644 index 00000000..97bdc5cc --- /dev/null +++ b/nano/boost/asio/spawn.hpp @@ -0,0 +1,7 @@ +#pragma once + +#include + +DISABLE_ASIO_WARNINGS +#include +REENABLE_WARNINGS diff --git a/nano/load_test/CMakeLists.txt b/nano/load_test/CMakeLists.txt index 08f49e68..a9110671 100644 --- a/nano/load_test/CMakeLists.txt +++ b/nano/load_test/CMakeLists.txt @@ -1,7 +1,7 @@ add_executable (load_test entry.cpp) -target_link_libraries (load_test node secure test_common gtest Boost::boost) +target_link_libraries (load_test node secure test_common gtest Boost::boost Boost::coroutine) target_compile_definitions(load_test PRIVATE diff --git a/nano/load_test/entry.cpp b/nano/load_test/entry.cpp index c1ef5ecb..e9e50ee6 100644 --- a/nano/load_test/entry.cpp +++ b/nano/load_test/entry.cpp @@ -1,7 +1,7 @@ #include #include #include -#include +#include #include #include #include @@ -67,12 +67,6 @@ void write_config_files (boost::filesystem::path const & data_path, int index) toml_rpc.write (nano::get_rpc_toml_config_path (data_path)); } -// Report a failure -void fail (boost::system::error_code ec, char const * what) -{ - std::cerr << what << ": " << ec.message () << "\n"; -} - class account final { public: @@ -95,244 +89,105 @@ public: bool error{ false }; }; -class receive_session final : public std::enable_shared_from_this +void send_receive (boost::asio::io_context & io_ctx, std::string const & wallet, std::string const & source, std::string const & destination, std::atomic & send_calls_remaining, tcp::resolver::results_type const & results, boost::asio::yield_context yield) { -public: - receive_session (boost::asio::io_context & ioc, std::atomic & send_calls_remaining, std::string const & wallet, std::string const & account, std::string const & block, tcp::resolver::results_type const & results) : - socket (ioc), - strand (socket.get_executor ()), - send_calls_remaining (send_calls_remaining), - wallet (wallet), - account (account), - block (block), - results (results) - { - } - - void run () - { - auto this_l (shared_from_this ()); - - boost::asio::async_connect (this_l->socket, this_l->results.cbegin (), this_l->results.cend (), boost::asio::bind_executor (strand, [this_l](boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator) { - if (ec) - { - return fail (ec, "connect"); - } - - boost::property_tree::ptree request; - request.put ("action", "receive"); - request.put ("wallet", this_l->wallet); - request.put ("account", this_l->account); - request.put ("block", this_l->block); - std::stringstream ostream; - boost::property_tree::write_json (ostream, request); - - this_l->req.method (http::verb::post); - this_l->req.version (11); - this_l->req.target ("/"); - this_l->req.body () = ostream.str (); - this_l->req.prepare_payload (); - - http::async_write (this_l->socket, this_l->req, boost::asio::bind_executor (this_l->strand, [this_l](boost::system::error_code ec, std::size_t) { - if (ec) - { - return fail (ec, "write"); - } - - http::async_read (this_l->socket, this_l->buffer, this_l->res, boost::asio::bind_executor (this_l->strand, [this_l](boost::system::error_code ec, std::size_t) { - if (ec) - { - return fail (ec, "read"); - } - - --this_l->send_calls_remaining; - - // Gracefully close the socket - this_l->socket.shutdown (tcp::socket::shutdown_both, ec); - if (ec && ec != boost::system::errc::not_connected) - { - return fail (ec, "shutdown"); - } - })); - })); - })); - } - -private: - socket_type socket; - boost::asio::strand strand; boost::beast::flat_buffer buffer; http::request req; http::response res; - std::atomic & send_calls_remaining; - std::string wallet; - std::string account; - std::string block; - tcp::resolver::results_type const & results; -}; + socket_type socket (io_ctx); -class send_session final : public std::enable_shared_from_this -{ -public: - send_session (boost::asio::io_context & ioc, std::atomic & send_calls_remaining, std::string const & wallet, std::string const & source, std::string const & destination, tcp::resolver::results_type const & results) : - io_ctx (ioc), - socket (ioc), - strand (socket.get_executor ()), - send_calls_remaining (send_calls_remaining), - wallet (wallet), - source (source), - destination (destination), - results (results) - { - } + boost::asio::async_connect (socket, results.cbegin (), results.cend (), yield); - void run () - { - auto this_l (shared_from_this ()); - - boost::asio::async_connect (this_l->socket, this_l->results.cbegin (), this_l->results.cend (), boost::asio::bind_executor (strand, [this_l](boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator) { - if (ec) - { - return fail (ec, "connect"); - } - - boost::property_tree::ptree request; - request.put ("action", "send"); - request.put ("wallet", this_l->wallet); - request.put ("source", this_l->source); - request.put ("destination", this_l->destination); - request.put ("amount", "1"); - std::stringstream ostream; - boost::property_tree::write_json (ostream, request); - - this_l->req.method (http::verb::post); - this_l->req.version (11); - this_l->req.target ("/"); - this_l->req.body () = ostream.str (); - this_l->req.prepare_payload (); - - http::async_write (this_l->socket, this_l->req, boost::asio::bind_executor (this_l->strand, [this_l](boost::system::error_code ec, std::size_t) { - if (ec) - { - return fail (ec, "write"); - } - - http::async_read (this_l->socket, this_l->buffer, this_l->res, boost::asio::bind_executor (this_l->strand, [this_l](boost::system::error_code ec, std::size_t) { - if (ec) - { - return fail (ec, "read"); - } - - boost::property_tree::ptree json; - std::stringstream body (this_l->res.body ()); - boost::property_tree::read_json (body, json); - auto block = json.get ("block"); - - std::make_shared (this_l->io_ctx, this_l->send_calls_remaining, this_l->wallet, this_l->destination, block, this_l->results)->run (); - - this_l->socket.shutdown (tcp::socket::shutdown_both, ec); - if (ec && ec != boost::system::errc::not_connected) - { - return fail (ec, "shutdown"); - } - })); - })); - })); - } - -private: - boost::asio::io_context & io_ctx; - socket_type socket; - boost::asio::strand strand; - boost::beast::flat_buffer buffer; - http::request req; - http::response res; - std::atomic & send_calls_remaining; - std::string wallet; - std::string source; - std::string destination; - tcp::resolver::results_type const & results; -}; - -class rpc_session final : public std::enable_shared_from_this -{ -public: - rpc_session (boost::property_tree::ptree const & request, boost::asio::io_context & ioc, tcp::resolver::results_type const & results, std::function callback) : - io_ctx (ioc), - socket (ioc), - request (request), - results (results), - callback (callback) - { - } - - void run () - { - auto this_l (shared_from_this ()); - boost::asio::async_connect (this_l->socket, this_l->results.cbegin (), this_l->results.cend (), [this_l](boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator) { - if (ec) - { - fail (ec, "connect"); - } - - std::stringstream ostream; - boost::property_tree::write_json (ostream, this_l->request); - - this_l->req.method (http::verb::post); - this_l->req.version (11); - this_l->req.target ("/"); - this_l->req.body () = ostream.str (); - this_l->req.prepare_payload (); - - http::async_write (this_l->socket, this_l->req, [this_l](boost::system::error_code ec, std::size_t) { - if (ec) - { - fail (ec, "write"); - } - - http::async_read (this_l->socket, this_l->buffer, this_l->res, [this_l](boost::system::error_code ec, std::size_t) { - if (ec) - { - fail (ec, "read"); - } - - boost::property_tree::ptree json; - std::stringstream body (this_l->res.body ()); - boost::property_tree::read_json (body, json); - - this_l->socket.shutdown (tcp::socket::shutdown_both, ec); - if (ec && ec != boost::system::errc::not_connected) - { - fail (ec, "shutdown"); - } - else - { - return this_l->callback (json); - } - }); - }); - }); - } - -private: - boost::asio::io_context & io_ctx; - socket_type socket; - boost::beast::flat_buffer buffer; - http::request req; - http::response res; boost::property_tree::ptree request; - tcp::resolver::results_type const & results; - std::function callback; -}; + request.put ("action", "send"); + request.put ("wallet", wallet); + request.put ("source", source); + request.put ("destination", destination); + request.put ("amount", "1"); + std::stringstream ostream; + boost::property_tree::write_json (ostream, request); + + req.method (http::verb::post); + req.version (11); + req.target ("/"); + req.body () = ostream.str (); + req.prepare_payload (); + + http::async_write (socket, req, yield); + http::async_read (socket, buffer, res, yield); + boost::property_tree::ptree json; + std::stringstream body (res.body ()); + boost::property_tree::read_json (body, json); + auto block = json.get ("block"); + + // Shut down send socket + boost::system::error_code ec; + socket.shutdown (tcp::socket::shutdown_both, ec); + debug_assert (!ec || ec == boost::system::errc::not_connected); + + { + // Start receive session + boost::beast::flat_buffer buffer; + http::request req; + http::response res1; + socket_type socket (io_ctx); + + boost::asio::async_connect (socket, results.cbegin (), results.cend (), yield); + + boost::property_tree::ptree request; + request.put ("action", "receive"); + request.put ("wallet", wallet); + request.put ("account", destination); + request.put ("block", block); + std::stringstream ostream; + boost::property_tree::write_json (ostream, request); + + req.method (http::verb::post); + req.version (11); + req.target ("/"); + req.body () = ostream.str (); + req.prepare_payload (); + + http::async_write (socket, req, yield); + http::async_read (socket, buffer, res, yield); + --send_calls_remaining; + // Gracefully close the socket + boost::system::error_code ec; + socket.shutdown (tcp::socket::shutdown_both, ec); + debug_assert (!ec || ec == boost::system::errc::not_connected); + } +} boost::property_tree::ptree rpc_request (boost::property_tree::ptree const & request, boost::asio::io_context & ioc, tcp::resolver::results_type const & results) { debug_assert (results.size () == 1); + std::promise> promise; - auto rpc_session (std::make_shared (request, ioc, results, [&promise](auto const & response_a) { - promise.set_value (response_a); - })); - rpc_session->run (); + boost::asio::spawn (boost::asio::io_context::strand (ioc), [&ioc, &results, request, &promise](boost::asio::yield_context yield) { + socket_type socket (ioc); + boost::beast::flat_buffer buffer; + http::request req; + http::response res; + + boost::asio::async_connect (socket, results.cbegin (), results.cend (), yield); + std::stringstream ostream; + boost::property_tree::write_json (ostream, request); + + req.method (http::verb::post); + req.version (11); + req.target ("/"); + req.body () = ostream.str (); + req.prepare_payload (); + + http::async_write (socket, req, yield); + http::async_read (socket, buffer, res, yield); + + boost::property_tree::ptree json; + std::stringstream body (res.body ()); + boost::property_tree::read_json (body, json); + promise.set_value (json); + }); + auto future = promise.get_future (); if (future.wait_for (std::chrono::seconds (5)) != std::future_status::ready) { @@ -601,7 +456,10 @@ int main (int argc, char * const * argv) destination_account = &destination_accounts[random_account_index]; } - std::make_shared (ioc, send_calls_remaining, wallet, nano::genesis_account.to_account (), destination_account->as_string, primary_node_results)->run (); + // Send from genesis account to different accounts and receive the funds + boost::asio::spawn (boost::asio::io_context::strand (ioc), [&ioc, &primary_node_results, &wallet, &resolver, &node_count, destination_account, &send_calls_remaining](boost::asio::yield_context yield) { + send_receive (ioc, wallet, nano::genesis_account.to_account (), destination_account->as_string, send_calls_remaining, primary_node_results, yield); + }); } while (send_calls_remaining != 0) diff --git a/nano/rpc_test/CMakeLists.txt b/nano/rpc_test/CMakeLists.txt index abbb1c28..d0859929 100644 --- a/nano/rpc_test/CMakeLists.txt +++ b/nano/rpc_test/CMakeLists.txt @@ -2,7 +2,7 @@ add_executable (rpc_test entry.cpp rpc.cpp) -target_link_libraries(rpc_test node rpc test_common gtest) +target_link_libraries(rpc_test node rpc test_common gtest Boost::coroutine) target_compile_definitions(rpc_test PUBLIC diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 03ee251d..dd4e0350 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -25,24 +26,27 @@ namespace class test_response { public: - test_response (boost::property_tree::ptree const & request_a, boost::asio::io_context & io_ctx) : - request (request_a), - sock (io_ctx) + test_response (boost::property_tree::ptree const & request_a, boost::asio::io_context & io_ctx_a) : + request (request_a) { } - test_response (boost::property_tree::ptree const & request_a, uint16_t port, boost::asio::io_context & io_ctx) : - request (request_a), - sock (io_ctx) + test_response (boost::property_tree::ptree const & request_a, uint16_t port_a, boost::asio::io_context & io_ctx_a) : + request (request_a) { - run (port); + run (port_a, io_ctx_a); } - void run (uint16_t port) + void run (uint16_t port_a, boost::asio::io_context & io_ctx_a) { - sock.async_connect (nano::tcp_endpoint (boost::asio::ip::address_v6::loopback (), port), [this](boost::system::error_code const & ec) { - if (!ec) + boost::asio::spawn (io_ctx_a, [this, &io_ctx_a, port_a](boost::asio::yield_context yield) { + boost::asio::ip::tcp::socket sock (io_ctx_a); + boost::beast::flat_buffer sb; + boost::beast::http::request req; + + try { + sock.async_connect (nano::tcp_endpoint (boost::asio::ip::address_v6::loopback (), port_a), yield); std::stringstream ostream; boost::property_tree::write_json (ostream, request); req.method (boost::beast::http::verb::post); @@ -51,46 +55,27 @@ public: ostream.flush (); req.body () = ostream.str (); req.prepare_payload (); - boost::beast::http::async_write (sock, req, [this](boost::system::error_code const & ec, size_t bytes_transferred) { - if (!ec) - { - boost::beast::http::async_read (sock, sb, resp, [this](boost::system::error_code const & ec, size_t bytes_transferred) { - if (!ec) - { - std::stringstream body (resp.body ()); - try - { - boost::property_tree::read_json (body, json); - status = 200; - } - catch (std::exception &) - { - status = 500; - } - } - else - { - status = 400; - }; - }); - } - else - { - status = 600; - } - }); + boost::beast::http::async_write (sock, req, yield); + boost::beast::http::async_read (sock, sb, resp, yield); + std::stringstream body (resp.body ()); + try + { + boost::property_tree::read_json (body, json); + status = 200; + } + catch (std::exception const &) + { + status = 400; + } } - else + catch (boost::system::error_code const &) { status = 400; } }); } boost::property_tree::ptree const & request; - boost::asio::ip::tcp::socket sock; boost::property_tree::ptree json; - boost::beast::flat_buffer sb; - boost::beast::http::request req; boost::beast::http::response resp; std::atomic status{ 0 }; }; @@ -6993,8 +6978,8 @@ TEST (rpc, simultaneous_calls) std::atomic count{ num }; for (int i = 0; i < num; ++i) { - std::thread ([&test_responses, &promise, &count, i, port = rpc.config.port]() { - test_responses[i]->run (port); + std::thread ([&test_responses, &promise, &count, i, port = rpc.config.port, &io_ctx = system.io_ctx]() { + test_responses[i]->run (port, io_ctx); if (--count == 0) { promise.set_value ();