Merge PR Move from boost spawn/yield coroutine on load_test to async callbacks (#4140)

Move from boost spawn/yield coroutine on load_test to async callbacks
This commit is contained in:
Thiago Silva 2023-02-16 19:52:25 -03:00 committed by GitHub
commit 65941a0632
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -1,7 +1,6 @@
#include <nano/boost/asio/bind_executor.hpp>
#include <nano/boost/asio/connect.hpp>
#include <nano/boost/asio/ip/tcp.hpp>
#include <nano/boost/asio/spawn.hpp>
#include <nano/boost/asio/strand.hpp>
#include <nano/boost/beast/core/flat_buffer.hpp>
#include <nano/boost/beast/http.hpp>
#include <nano/boost/process/child.hpp>
@ -18,6 +17,7 @@
#include <csignal>
#include <future>
#include <iomanip>
#include <memory>
#include <random>
/* Boost v1.70 introduced breaking changes; the conditional compilation allows 1.6x to be supported as well. */
@ -90,51 +90,62 @@ public:
bool error{ false };
};
void send_receive (boost::asio::io_context & io_ctx, std::string const & wallet, std::string const & source, std::string const & destination, std::atomic<int> & send_calls_remaining, tcp::resolver::results_type const & results, boost::asio::yield_context yield)
class send_receive_impl;
class start_receive_session_impl;
class rpc_request_impl;
class start_receive_session_impl : public std::enable_shared_from_this<start_receive_session_impl>
{
private:
socket_type socket;
std::atomic<int> & send_calls_remaining;
tcp::resolver::results_type const & results;
std::string const wallet;
std::string const source;
std::string const destination;
std::string const block;
boost::beast::flat_buffer buffer;
http::request<http::string_body> req;
http::response<http::string_body> res;
socket_type socket (io_ctx);
boost::asio::async_connect (socket, results.cbegin (), results.cend (), yield);
boost::property_tree::ptree request;
request.put ("action", "send");
request.put ("wallet", wallet);
request.put ("source", source);
request.put ("destination", destination);
request.put ("amount", "1");
std::stringstream ostream;
boost::property_tree::write_json (ostream, request);
req.method (http::verb::post);
req.version (11);
req.target ("/");
req.body () = ostream.str ();
req.prepare_payload ();
http::async_write (socket, req, yield);
http::async_read (socket, buffer, res, yield);
boost::property_tree::ptree json;
std::stringstream body (res.body ());
boost::property_tree::read_json (body, json);
auto block = json.get<std::string> ("block");
// Shut down send socket
boost::system::error_code ec;
socket.shutdown (tcp::socket::shutdown_both, ec);
debug_assert (!ec || ec == boost::system::errc::not_connected);
public:
start_receive_session_impl (
boost::asio::io_context & io_ctx_a,
tcp::resolver::results_type const & results_a,
std::string const & wallet_a,
std::string const & source_a,
std::string const & destination_a,
std::atomic<int> & send_calls_remaining_a,
std::string const block_a) :
socket{ io_ctx_a },
send_calls_remaining{ send_calls_remaining_a },
results{ results_a },
wallet{ wallet_a },
source{ source_a },
destination{ destination_a },
block{ std::move (block_a) }
{
// Start receive session
boost::beast::flat_buffer buffer;
http::request<http::string_body> req;
http::response<http::string_body> res1;
socket_type socket (io_ctx);
}
boost::asio::async_connect (socket, results.cbegin (), results.cend (), yield);
void start ()
{
async_connect ();
}
private:
void async_connect ()
{
boost::asio::async_connect (socket, results.cbegin (), results.cend (),
[this_l = shared_from_this ()] (boost::system::error_code const & ec, tcp::resolver::iterator iterator) {
this_l->request_receive ();
});
}
void request_receive ()
{
boost::property_tree::ptree request;
request.put ("action", "receive");
request.put ("wallet", wallet);
@ -149,28 +160,98 @@ void send_receive (boost::asio::io_context & io_ctx, std::string const & wallet,
req.body () = ostream.str ();
req.prepare_payload ();
http::async_write (socket, req, yield);
http::async_read (socket, buffer, res, yield);
--send_calls_remaining;
async_write ();
}
void async_write ()
{
http::async_write (socket, req,
[this_l = shared_from_this ()] (boost::system::error_code const & error_code, std::size_t bytes_transferred) {
debug_assert (!error_code);
debug_assert (bytes_transferred > 0);
this_l->async_read ();
});
}
void async_read ()
{
http::async_read (socket, buffer, res,
[this_l = shared_from_this ()] (boost::system::error_code const & error_code, std::size_t bytes_transferred) {
debug_assert (!error_code);
debug_assert (bytes_transferred > 0);
--this_l->send_calls_remaining;
this_l->socket_shutdown ();
});
}
void socket_shutdown ()
{
// Gracefully close the socket
boost::system::error_code ec;
socket.shutdown (tcp::socket::shutdown_both, ec);
debug_assert (!ec || ec == boost::system::errc::not_connected);
}
}
};
boost::property_tree::ptree rpc_request (boost::property_tree::ptree const & request, boost::asio::io_context & ioc, tcp::resolver::results_type const & results)
class send_receive_impl : public std::enable_shared_from_this<send_receive_impl>
{
debug_assert (results.size () == 1);
private:
boost::asio::io_context & io_ctx;
socket_type socket;
std::promise<boost::optional<boost::property_tree::ptree>> promise;
boost::asio::spawn (ioc, [&ioc, &results, request, &promise] (boost::asio::yield_context yield) {
socket_type socket (ioc);
boost::beast::flat_buffer buffer;
http::request<http::string_body> req;
http::response<http::string_body> res;
std::string const wallet;
std::string const source;
std::string const destination;
boost::asio::async_connect (socket, results.cbegin (), results.cend (), yield);
std::atomic<int> & send_calls_remaining;
tcp::resolver::results_type const results;
boost::beast::flat_buffer buffer;
http::request<http::string_body> req;
http::response<http::string_body> res;
std::shared_ptr<start_receive_session_impl> start_receive_session = nullptr;
public:
send_receive_impl (
boost::asio::io_context & io_ctx_a,
std::string const & wallet_a,
std::string const & source_a,
std::string const & destination_a,
std::atomic<int> & send_calls_remaining_a,
tcp::resolver::results_type const & results_a) :
io_ctx{ io_ctx_a },
socket{ io_ctx },
wallet{ wallet_a },
source{ source_a },
destination{ destination_a },
send_calls_remaining{ send_calls_remaining_a },
results{ results_a }
{
}
void start ()
{
async_connect ();
}
private:
void async_connect ()
{
boost::asio::async_connect (socket, results.cbegin (), results.cend (),
[this_l = shared_from_this ()] (boost::system::error_code const & ec, tcp::resolver::iterator iterator) {
this_l->request_send ();
});
}
void request_send ()
{
boost::property_tree::ptree request;
request.put ("action", "send");
request.put ("wallet", wallet);
request.put ("source", source);
request.put ("destination", destination);
request.put ("amount", "1");
std::stringstream ostream;
boost::property_tree::write_json (ostream, request);
@ -180,23 +261,156 @@ boost::property_tree::ptree rpc_request (boost::property_tree::ptree const & req
req.body () = ostream.str ();
req.prepare_payload ();
http::async_write (socket, req, yield);
http::async_read (socket, buffer, res, yield);
async_write ();
}
void async_write ()
{
http::async_write (socket, req,
[this_l = shared_from_this ()] (boost::system::error_code const & error_code, std::size_t bytes_transferred) {
debug_assert (!error_code);
debug_assert (bytes_transferred > 0);
this_l->async_read ();
});
}
void async_read ()
{
http::async_read (socket, buffer, res,
[this_l = shared_from_this ()] (boost::system::error_code const & error_code, std::size_t bytes_transferred) {
debug_assert (!error_code);
debug_assert (bytes_transferred > 0);
this_l->receive_start ();
this_l->socket_shutdown ();
});
}
void socket_shutdown ()
{
// Shut down send socket
boost::system::error_code ec;
socket.shutdown (tcp::socket::shutdown_both, ec);
debug_assert (!ec || ec == boost::system::errc::not_connected);
}
void receive_start ()
{
boost::property_tree::ptree json;
std::stringstream body (res.body ());
boost::property_tree::read_json (body, json);
auto block = json.get<std::string> ("block");
start_receive_session = std::make_shared<start_receive_session_impl> (
io_ctx, results, wallet, source, destination, send_calls_remaining, block);
start_receive_session->start ();
}
};
class rpc_request_impl : public std::enable_shared_from_this<rpc_request_impl>
{
private:
boost::property_tree::ptree const request;
boost::asio::io_context & ioc;
tcp::resolver::results_type const results;
socket_type socket;
boost::beast::flat_buffer buffer;
http::request<http::string_body> req;
http::response<http::string_body> res;
std::promise<boost::optional<boost::property_tree::ptree>> promise;
public:
rpc_request_impl (
boost::property_tree::ptree const & request_a,
boost::asio::io_context & ioc_a,
tcp::resolver::results_type const & results_a) :
request{ request_a },
ioc{ ioc_a },
results{ results_a },
socket{ ioc }
{
debug_assert (results.size () == 1);
}
void start ()
{
async_connect ();
}
boost::property_tree::ptree value_get ()
{
auto future = promise.get_future ();
if (future.wait_for (std::chrono::seconds (5)) != std::future_status::ready)
{
throw std::runtime_error ("RPC request timed out");
}
auto response = future.get ();
debug_assert (response.is_initialized ());
return response.value_or (decltype (response)::argument_type{});
}
private:
void async_connect ()
{
boost::asio::async_connect (socket, results.cbegin (), results.cend (),
[this_l = shared_from_this ()] (boost::system::error_code const & ec, tcp::resolver::iterator iterator) {
this_l->request_do ();
});
}
void request_do ()
{
std::stringstream ostream;
boost::property_tree::write_json (ostream, request);
req.method (http::verb::post);
req.version (11);
req.target ("/");
req.body () = ostream.str ();
req.prepare_payload ();
async_write ();
}
void async_write ()
{
http::async_write (socket, req,
[this_l = shared_from_this ()] (boost::system::error_code const & error_code, std::size_t bytes_transferred) {
debug_assert (!error_code);
debug_assert (bytes_transferred > 0);
this_l->async_read ();
});
}
void async_read ()
{
http::async_read (socket, buffer, res,
[this_l = shared_from_this ()] (boost::system::error_code const & error_code, std::size_t bytes_transferred) {
debug_assert (!error_code);
debug_assert (bytes_transferred > 0);
this_l->value_set ();
});
}
void value_set ()
{
boost::property_tree::ptree json;
std::stringstream body (res.body ());
boost::property_tree::read_json (body, json);
promise.set_value (json);
});
auto future = promise.get_future ();
if (future.wait_for (std::chrono::seconds (5)) != std::future_status::ready)
{
throw std::runtime_error ("RPC request timed out");
}
auto response = future.get ();
debug_assert (response.is_initialized ());
return response.value_or (decltype (response)::argument_type{});
};
boost::property_tree::ptree rpc_request (boost::property_tree::ptree const & request, boost::asio::io_context & ioc, tcp::resolver::results_type const & results)
{
auto rpc_request = std::make_shared<rpc_request_impl> (request, ioc, results);
boost::asio::strand<boost::asio::io_context::executor_type> strand{ ioc.get_executor () };
boost::asio::post (strand,
[rpc_request] () {
rpc_request->start ();
});
return rpc_request->value_get ();
}
void keepalive_rpc (boost::asio::io_context & ioc, tcp::resolver::results_type const & results, uint16_t port)
@ -421,7 +635,6 @@ int main (int argc, char * const * argv)
std::uniform_int_distribution<size_t> dist (0, destination_accounts.size () - 1);
std::atomic<int> send_calls_remaining{ send_count };
for (auto i = 0; i < send_count; ++i)
{
account * destination_account;
@ -436,8 +649,11 @@ int main (int argc, char * const * argv)
}
// Send from genesis account to different accounts and receive the funds
boost::asio::spawn (ioc, [&ioc, &primary_node_results, &wallet, destination_account, &send_calls_remaining] (boost::asio::yield_context yield) {
send_receive (ioc, wallet, nano::dev::genesis->account ().to_account (), destination_account->as_string, send_calls_remaining, primary_node_results, yield);
auto send_receive = std::make_shared<send_receive_impl> (ioc, wallet, nano::dev::genesis->account ().to_account (), destination_account->as_string, send_calls_remaining, primary_node_results);
boost::asio::strand<boost::asio::io_context::executor_type> strand{ ioc.get_executor () };
boost::asio::post (strand,
[send_receive] () {
send_receive->start ();
});
}