diff --git a/nano/load_test/entry.cpp b/nano/load_test/entry.cpp index 5c76cb752..a2a37123b 100644 --- a/nano/load_test/entry.cpp +++ b/nano/load_test/entry.cpp @@ -1,7 +1,5 @@ -#include #include #include -#include #include #include #include @@ -92,6 +90,222 @@ public: bool error{ false }; }; +class send_receive_impl; +class start_receive_session_impl; +class rpc_request_impl; + +class start_receive_session_impl : public std::enable_shared_from_this +{ +private: + socket_type socket; + std::atomic & send_calls_remaining; + tcp::resolver::results_type const & results; + + std::string const wallet; + std::string const source; + std::string const destination; + + std::string const block; + + boost::beast::flat_buffer buffer; + http::request req; + http::response res; + +public: + start_receive_session_impl ( + boost::asio::io_context & io_ctx_a, + tcp::resolver::results_type const & results_a, + std::string const & wallet_a, + std::string const & source_a, + std::string const & destination_a, + std::atomic & send_calls_remaining_a, + std::string const block_a) : + socket{ io_ctx_a }, + send_calls_remaining{ send_calls_remaining_a }, + results{ results_a }, + wallet{ wallet_a }, + source{ source_a }, + destination{ destination_a }, + block{ std::move (block_a) } + { + } + + void start () + { + async_connect (); + } + +private: + void async_connect () + { + boost::asio::async_connect (socket, results.cbegin (), results.cend (), + [this_l = shared_from_this ()] (boost::system::error_code const & ec, tcp::resolver::iterator iterator) { + this_l->request_receive (); + }); + } + + void request_receive () + { + boost::property_tree::ptree request; + request.put ("action", "receive"); + request.put ("wallet", wallet); + request.put ("account", destination); + request.put ("block", block); + std::stringstream ostream; + boost::property_tree::write_json (ostream, request); + + req.method (http::verb::post); + req.version (11); + req.target ("/"); + req.body () = ostream.str (); + req.prepare_payload (); + + async_write (); + } + + void async_write () + { + http::async_write (socket, req, + [this_l = shared_from_this ()] (boost::system::error_code const & error_code, std::size_t bytes_transferred) { + debug_assert (!error_code); + debug_assert (bytes_transferred > 0); + this_l->async_read (); + }); + } + + void async_read () + { + http::async_read (socket, buffer, res, + [this_l = shared_from_this ()] (boost::system::error_code const & error_code, std::size_t bytes_transferred) { + debug_assert (!error_code); + debug_assert (bytes_transferred > 0); + --this_l->send_calls_remaining; + this_l->socket_shutdown (); + }); + } + + void socket_shutdown () + { + // Gracefully close the socket + boost::system::error_code ec; + socket.shutdown (tcp::socket::shutdown_both, ec); + debug_assert (!ec || ec == boost::system::errc::not_connected); + } +}; + +class send_receive_impl : public std::enable_shared_from_this +{ +private: + boost::asio::io_context & io_ctx; + socket_type socket; + + std::string const wallet; + std::string const source; + std::string const destination; + + std::atomic & send_calls_remaining; + tcp::resolver::results_type const results; + + boost::beast::flat_buffer buffer; + http::request req; + http::response res; + + std::shared_ptr start_receive_session = nullptr; + +public: + send_receive_impl ( + boost::asio::io_context & io_ctx_a, + std::string const & wallet_a, + std::string const & source_a, + std::string const & destination_a, + std::atomic & send_calls_remaining_a, + tcp::resolver::results_type const & results_a) : + io_ctx{ io_ctx_a }, + socket{ io_ctx }, + wallet{ wallet_a }, + source{ source_a }, + destination{ destination_a }, + send_calls_remaining{ send_calls_remaining_a }, + results{ results_a } + { + } + + void start () + { + async_connect (); + } + +private: + void async_connect () + { + boost::asio::async_connect (socket, results.cbegin (), results.cend (), + [this_l = shared_from_this ()] (boost::system::error_code const & ec, tcp::resolver::iterator iterator) { + this_l->request_send (); + }); + } + + void request_send () + { + boost::property_tree::ptree request; + request.put ("action", "send"); + request.put ("wallet", wallet); + request.put ("source", source); + request.put ("destination", destination); + request.put ("amount", "1"); + std::stringstream ostream; + boost::property_tree::write_json (ostream, request); + + req.method (http::verb::post); + req.version (11); + req.target ("/"); + req.body () = ostream.str (); + req.prepare_payload (); + + async_write (); + } + + void async_write () + { + http::async_write (socket, req, + [this_l = shared_from_this ()] (boost::system::error_code const & error_code, std::size_t bytes_transferred) { + debug_assert (!error_code); + debug_assert (bytes_transferred > 0); + this_l->async_read (); + }); + } + + void async_read () + { + http::async_read (socket, buffer, res, + [this_l = shared_from_this ()] (boost::system::error_code const & error_code, std::size_t bytes_transferred) { + debug_assert (!error_code); + debug_assert (bytes_transferred > 0); + this_l->receive_start (); + this_l->socket_shutdown (); + }); + } + + void socket_shutdown () + { + // Shut down send socket + boost::system::error_code ec; + socket.shutdown (tcp::socket::shutdown_both, ec); + debug_assert (!ec || ec == boost::system::errc::not_connected); + } + + void receive_start () + { + boost::property_tree::ptree json; + std::stringstream body (res.body ()); + boost::property_tree::read_json (body, json); + auto block = json.get ("block"); + + start_receive_session = std::make_shared ( + io_ctx, results, wallet, source, destination, send_calls_remaining, block); + start_receive_session->start (); + } +}; + class rpc_request_impl : public std::enable_shared_from_this { private: @@ -188,75 +402,6 @@ private: } }; -void send_receive (boost::asio::io_context & io_ctx, std::string const & wallet, std::string const & source, std::string const & destination, std::atomic & send_calls_remaining, tcp::resolver::results_type const & results, boost::asio::yield_context yield) -{ - boost::beast::flat_buffer buffer; - http::request req; - http::response res; - socket_type socket (io_ctx); - - boost::asio::async_connect (socket, results.cbegin (), results.cend (), yield); - - boost::property_tree::ptree request; - request.put ("action", "send"); - request.put ("wallet", wallet); - request.put ("source", source); - request.put ("destination", destination); - request.put ("amount", "1"); - std::stringstream ostream; - boost::property_tree::write_json (ostream, request); - - req.method (http::verb::post); - req.version (11); - req.target ("/"); - req.body () = ostream.str (); - req.prepare_payload (); - - http::async_write (socket, req, yield); - http::async_read (socket, buffer, res, yield); - boost::property_tree::ptree json; - std::stringstream body (res.body ()); - boost::property_tree::read_json (body, json); - auto block = json.get ("block"); - - // Shut down send socket - boost::system::error_code ec; - socket.shutdown (tcp::socket::shutdown_both, ec); - debug_assert (!ec || ec == boost::system::errc::not_connected); - - { - // Start receive session - boost::beast::flat_buffer buffer; - http::request req; - http::response res1; - socket_type socket (io_ctx); - - boost::asio::async_connect (socket, results.cbegin (), results.cend (), yield); - - boost::property_tree::ptree request; - request.put ("action", "receive"); - request.put ("wallet", wallet); - request.put ("account", destination); - request.put ("block", block); - std::stringstream ostream; - boost::property_tree::write_json (ostream, request); - - req.method (http::verb::post); - req.version (11); - req.target ("/"); - req.body () = ostream.str (); - req.prepare_payload (); - - http::async_write (socket, req, yield); - http::async_read (socket, buffer, res, yield); - --send_calls_remaining; - // Gracefully close the socket - boost::system::error_code ec; - socket.shutdown (tcp::socket::shutdown_both, ec); - debug_assert (!ec || ec == boost::system::errc::not_connected); - } -} - boost::property_tree::ptree rpc_request (boost::property_tree::ptree const & request, boost::asio::io_context & ioc, tcp::resolver::results_type const & results) { auto rpc_request = std::make_shared (request, ioc, results); @@ -490,7 +635,6 @@ int main (int argc, char * const * argv) std::uniform_int_distribution dist (0, destination_accounts.size () - 1); std::atomic send_calls_remaining{ send_count }; - for (auto i = 0; i < send_count; ++i) { account * destination_account; @@ -505,8 +649,11 @@ int main (int argc, char * const * argv) } // Send from genesis account to different accounts and receive the funds - boost::asio::spawn (ioc, [&ioc, &primary_node_results, &wallet, destination_account, &send_calls_remaining] (boost::asio::yield_context yield) { - send_receive (ioc, wallet, nano::dev::genesis->account ().to_account (), destination_account->as_string, send_calls_remaining, primary_node_results, yield); + auto send_receive = std::make_shared (ioc, wallet, nano::dev::genesis->account ().to_account (), destination_account->as_string, send_calls_remaining, primary_node_results); + boost::asio::strand strand{ ioc.get_executor () }; + boost::asio::post (strand, + [send_receive] () { + send_receive->start (); }); }