From 11b20a76677baad0614df69f4e38b92aa451e8fd Mon Sep 17 00:00:00 2001 From: Thiago Silva Date: Thu, 16 Feb 2023 16:33:29 -0300 Subject: [PATCH] Move rpc_request coroutine implementation to async callback style --- nano/load_test/entry.cpp | 139 +++++++++++++++++++++++++++++---------- 1 file changed, 104 insertions(+), 35 deletions(-) diff --git a/nano/load_test/entry.cpp b/nano/load_test/entry.cpp index 3aa17fda4..5c76cb752 100644 --- a/nano/load_test/entry.cpp +++ b/nano/load_test/entry.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -18,6 +19,7 @@ #include #include #include +#include #include /* Boost v1.70 introduced breaking changes; the conditional compilation allows 1.6x to be supported as well. */ @@ -90,6 +92,102 @@ public: bool error{ false }; }; +class rpc_request_impl : public std::enable_shared_from_this +{ +private: + boost::property_tree::ptree const request; + boost::asio::io_context & ioc; + tcp::resolver::results_type const results; + socket_type socket; + + boost::beast::flat_buffer buffer; + http::request req; + http::response res; + + std::promise> promise; + +public: + rpc_request_impl ( + boost::property_tree::ptree const & request_a, + boost::asio::io_context & ioc_a, + tcp::resolver::results_type const & results_a) : + request{ request_a }, + ioc{ ioc_a }, + results{ results_a }, + socket{ ioc } + { + debug_assert (results.size () == 1); + } + + void start () + { + async_connect (); + } + + boost::property_tree::ptree value_get () + { + auto future = promise.get_future (); + if (future.wait_for (std::chrono::seconds (5)) != std::future_status::ready) + { + throw std::runtime_error ("RPC request timed out"); + } + auto response = future.get (); + debug_assert (response.is_initialized ()); + return response.value_or (decltype (response)::argument_type{}); + } + +private: + void async_connect () + { + boost::asio::async_connect (socket, results.cbegin (), results.cend (), + [this_l = shared_from_this ()] (boost::system::error_code const & ec, tcp::resolver::iterator iterator) { + this_l->request_do (); + }); + } + + void request_do () + { + std::stringstream ostream; + boost::property_tree::write_json (ostream, request); + + req.method (http::verb::post); + req.version (11); + req.target ("/"); + req.body () = ostream.str (); + req.prepare_payload (); + + async_write (); + } + + void async_write () + { + http::async_write (socket, req, + [this_l = shared_from_this ()] (boost::system::error_code const & error_code, std::size_t bytes_transferred) { + debug_assert (!error_code); + debug_assert (bytes_transferred > 0); + this_l->async_read (); + }); + } + + void async_read () + { + http::async_read (socket, buffer, res, + [this_l = shared_from_this ()] (boost::system::error_code const & error_code, std::size_t bytes_transferred) { + debug_assert (!error_code); + debug_assert (bytes_transferred > 0); + this_l->value_set (); + }); + } + + void value_set () + { + boost::property_tree::ptree json; + std::stringstream body (res.body ()); + boost::property_tree::read_json (body, json); + promise.set_value (json); + } +}; + void send_receive (boost::asio::io_context & io_ctx, std::string const & wallet, std::string const & source, std::string const & destination, std::atomic & send_calls_remaining, tcp::resolver::results_type const & results, boost::asio::yield_context yield) { boost::beast::flat_buffer buffer; @@ -161,42 +259,13 @@ void send_receive (boost::asio::io_context & io_ctx, std::string const & wallet, boost::property_tree::ptree rpc_request (boost::property_tree::ptree const & request, boost::asio::io_context & ioc, tcp::resolver::results_type const & results) { - debug_assert (results.size () == 1); - - std::promise> promise; - boost::asio::spawn (ioc, [&ioc, &results, request, &promise] (boost::asio::yield_context yield) { - socket_type socket (ioc); - boost::beast::flat_buffer buffer; - http::request req; - http::response res; - - boost::asio::async_connect (socket, results.cbegin (), results.cend (), yield); - std::stringstream ostream; - boost::property_tree::write_json (ostream, request); - - req.method (http::verb::post); - req.version (11); - req.target ("/"); - req.body () = ostream.str (); - req.prepare_payload (); - - http::async_write (socket, req, yield); - http::async_read (socket, buffer, res, yield); - - boost::property_tree::ptree json; - std::stringstream body (res.body ()); - boost::property_tree::read_json (body, json); - promise.set_value (json); + auto rpc_request = std::make_shared (request, ioc, results); + boost::asio::strand strand{ ioc.get_executor () }; + boost::asio::post (strand, + [rpc_request] () { + rpc_request->start (); }); - - auto future = promise.get_future (); - if (future.wait_for (std::chrono::seconds (5)) != std::future_status::ready) - { - throw std::runtime_error ("RPC request timed out"); - } - auto response = future.get (); - debug_assert (response.is_initialized ()); - return response.value_or (decltype (response)::argument_type{}); + return rpc_request->value_get (); } void keepalive_rpc (boost::asio::io_context & ioc, tcp::resolver::results_type const & results, uint16_t port)