Use Boost.Coroutine with asio/beast async calls in tests (#2788)

* Use Boost.Coroutine in tests

* Change rpc_session to use coroutines

* Formatting
This commit is contained in:
Wesley Shillingford 2020-09-02 11:38:34 +01:00 committed by GitHub
commit 258a39b30c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 132 additions and 280 deletions

View file

@ -426,6 +426,8 @@ if (NANO_FUZZER_TEST)
endif ()
if (NANO_TEST OR RAIBLOCKS_TEST)
find_package (Boost 1.69.0 REQUIRED COMPONENTS coroutine)
if(WIN32)
if(MSVC_VERSION)
if(MSVC_VERSION GREATER_EQUAL 1910)

View file

@ -0,0 +1,7 @@
#pragma once
#include <nano/boost/private/macro_warnings.hpp>
DISABLE_ASIO_WARNINGS
#include <boost/asio/spawn.hpp>
REENABLE_WARNINGS

View file

@ -1,7 +1,7 @@
add_executable (load_test
entry.cpp)
target_link_libraries (load_test node secure test_common gtest Boost::boost)
target_link_libraries (load_test node secure test_common gtest Boost::boost Boost::coroutine)
target_compile_definitions(load_test
PRIVATE

View file

@ -1,7 +1,7 @@
#include <nano/boost/asio/bind_executor.hpp>
#include <nano/boost/asio/connect.hpp>
#include <nano/boost/asio/ip/tcp.hpp>
#include <nano/boost/asio/strand.hpp>
#include <nano/boost/asio/spawn.hpp>
#include <nano/boost/beast/core/flat_buffer.hpp>
#include <nano/boost/beast/http.hpp>
#include <nano/boost/process/child.hpp>
@ -67,12 +67,6 @@ void write_config_files (boost::filesystem::path const & data_path, int index)
toml_rpc.write (nano::get_rpc_toml_config_path (data_path));
}
// Report a failure
void fail (boost::system::error_code ec, char const * what)
{
std::cerr << what << ": " << ec.message () << "\n";
}
class account final
{
public:
@ -95,244 +89,105 @@ public:
bool error{ false };
};
class receive_session final : public std::enable_shared_from_this<receive_session>
void send_receive (boost::asio::io_context & io_ctx, std::string const & wallet, std::string const & source, std::string const & destination, std::atomic<int> & send_calls_remaining, tcp::resolver::results_type const & results, boost::asio::yield_context yield)
{
public:
receive_session (boost::asio::io_context & ioc, std::atomic<int> & send_calls_remaining, std::string const & wallet, std::string const & account, std::string const & block, tcp::resolver::results_type const & results) :
socket (ioc),
strand (socket.get_executor ()),
send_calls_remaining (send_calls_remaining),
wallet (wallet),
account (account),
block (block),
results (results)
{
}
void run ()
{
auto this_l (shared_from_this ());
boost::asio::async_connect (this_l->socket, this_l->results.cbegin (), this_l->results.cend (), boost::asio::bind_executor (strand, [this_l](boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator) {
if (ec)
{
return fail (ec, "connect");
}
boost::property_tree::ptree request;
request.put ("action", "receive");
request.put ("wallet", this_l->wallet);
request.put ("account", this_l->account);
request.put ("block", this_l->block);
std::stringstream ostream;
boost::property_tree::write_json (ostream, request);
this_l->req.method (http::verb::post);
this_l->req.version (11);
this_l->req.target ("/");
this_l->req.body () = ostream.str ();
this_l->req.prepare_payload ();
http::async_write (this_l->socket, this_l->req, boost::asio::bind_executor (this_l->strand, [this_l](boost::system::error_code ec, std::size_t) {
if (ec)
{
return fail (ec, "write");
}
http::async_read (this_l->socket, this_l->buffer, this_l->res, boost::asio::bind_executor (this_l->strand, [this_l](boost::system::error_code ec, std::size_t) {
if (ec)
{
return fail (ec, "read");
}
--this_l->send_calls_remaining;
// Gracefully close the socket
this_l->socket.shutdown (tcp::socket::shutdown_both, ec);
if (ec && ec != boost::system::errc::not_connected)
{
return fail (ec, "shutdown");
}
}));
}));
}));
}
private:
socket_type socket;
boost::asio::strand<boost::asio::io_context::executor_type> strand;
boost::beast::flat_buffer buffer;
http::request<http::string_body> req;
http::response<http::string_body> res;
std::atomic<int> & send_calls_remaining;
std::string wallet;
std::string account;
std::string block;
tcp::resolver::results_type const & results;
};
socket_type socket (io_ctx);
class send_session final : public std::enable_shared_from_this<send_session>
{
public:
send_session (boost::asio::io_context & ioc, std::atomic<int> & send_calls_remaining, std::string const & wallet, std::string const & source, std::string const & destination, tcp::resolver::results_type const & results) :
io_ctx (ioc),
socket (ioc),
strand (socket.get_executor ()),
send_calls_remaining (send_calls_remaining),
wallet (wallet),
source (source),
destination (destination),
results (results)
{
}
boost::asio::async_connect (socket, results.cbegin (), results.cend (), yield);
void run ()
{
auto this_l (shared_from_this ());
boost::asio::async_connect (this_l->socket, this_l->results.cbegin (), this_l->results.cend (), boost::asio::bind_executor (strand, [this_l](boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator) {
if (ec)
{
return fail (ec, "connect");
}
boost::property_tree::ptree request;
request.put ("action", "send");
request.put ("wallet", this_l->wallet);
request.put ("source", this_l->source);
request.put ("destination", this_l->destination);
request.put ("amount", "1");
std::stringstream ostream;
boost::property_tree::write_json (ostream, request);
this_l->req.method (http::verb::post);
this_l->req.version (11);
this_l->req.target ("/");
this_l->req.body () = ostream.str ();
this_l->req.prepare_payload ();
http::async_write (this_l->socket, this_l->req, boost::asio::bind_executor (this_l->strand, [this_l](boost::system::error_code ec, std::size_t) {
if (ec)
{
return fail (ec, "write");
}
http::async_read (this_l->socket, this_l->buffer, this_l->res, boost::asio::bind_executor (this_l->strand, [this_l](boost::system::error_code ec, std::size_t) {
if (ec)
{
return fail (ec, "read");
}
boost::property_tree::ptree json;
std::stringstream body (this_l->res.body ());
boost::property_tree::read_json (body, json);
auto block = json.get<std::string> ("block");
std::make_shared<receive_session> (this_l->io_ctx, this_l->send_calls_remaining, this_l->wallet, this_l->destination, block, this_l->results)->run ();
this_l->socket.shutdown (tcp::socket::shutdown_both, ec);
if (ec && ec != boost::system::errc::not_connected)
{
return fail (ec, "shutdown");
}
}));
}));
}));
}
private:
boost::asio::io_context & io_ctx;
socket_type socket;
boost::asio::strand<boost::asio::io_context::executor_type> strand;
boost::beast::flat_buffer buffer;
http::request<http::string_body> req;
http::response<http::string_body> res;
std::atomic<int> & send_calls_remaining;
std::string wallet;
std::string source;
std::string destination;
tcp::resolver::results_type const & results;
};
class rpc_session final : public std::enable_shared_from_this<rpc_session>
{
public:
rpc_session (boost::property_tree::ptree const & request, boost::asio::io_context & ioc, tcp::resolver::results_type const & results, std::function<void(boost::property_tree::ptree const &)> callback) :
io_ctx (ioc),
socket (ioc),
request (request),
results (results),
callback (callback)
{
}
void run ()
{
auto this_l (shared_from_this ());
boost::asio::async_connect (this_l->socket, this_l->results.cbegin (), this_l->results.cend (), [this_l](boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator) {
if (ec)
{
fail (ec, "connect");
}
std::stringstream ostream;
boost::property_tree::write_json (ostream, this_l->request);
this_l->req.method (http::verb::post);
this_l->req.version (11);
this_l->req.target ("/");
this_l->req.body () = ostream.str ();
this_l->req.prepare_payload ();
http::async_write (this_l->socket, this_l->req, [this_l](boost::system::error_code ec, std::size_t) {
if (ec)
{
fail (ec, "write");
}
http::async_read (this_l->socket, this_l->buffer, this_l->res, [this_l](boost::system::error_code ec, std::size_t) {
if (ec)
{
fail (ec, "read");
}
boost::property_tree::ptree json;
std::stringstream body (this_l->res.body ());
boost::property_tree::read_json (body, json);
this_l->socket.shutdown (tcp::socket::shutdown_both, ec);
if (ec && ec != boost::system::errc::not_connected)
{
fail (ec, "shutdown");
}
else
{
return this_l->callback (json);
}
});
});
});
}
private:
boost::asio::io_context & io_ctx;
socket_type socket;
boost::beast::flat_buffer buffer;
http::request<http::string_body> req;
http::response<http::string_body> res;
boost::property_tree::ptree request;
tcp::resolver::results_type const & results;
std::function<void(boost::property_tree::ptree const &)> callback;
};
request.put ("action", "send");
request.put ("wallet", wallet);
request.put ("source", source);
request.put ("destination", destination);
request.put ("amount", "1");
std::stringstream ostream;
boost::property_tree::write_json (ostream, request);
req.method (http::verb::post);
req.version (11);
req.target ("/");
req.body () = ostream.str ();
req.prepare_payload ();
http::async_write (socket, req, yield);
http::async_read (socket, buffer, res, yield);
boost::property_tree::ptree json;
std::stringstream body (res.body ());
boost::property_tree::read_json (body, json);
auto block = json.get<std::string> ("block");
// Shut down send socket
boost::system::error_code ec;
socket.shutdown (tcp::socket::shutdown_both, ec);
debug_assert (!ec || ec == boost::system::errc::not_connected);
{
// Start receive session
boost::beast::flat_buffer buffer;
http::request<http::string_body> req;
http::response<http::string_body> res1;
socket_type socket (io_ctx);
boost::asio::async_connect (socket, results.cbegin (), results.cend (), yield);
boost::property_tree::ptree request;
request.put ("action", "receive");
request.put ("wallet", wallet);
request.put ("account", destination);
request.put ("block", block);
std::stringstream ostream;
boost::property_tree::write_json (ostream, request);
req.method (http::verb::post);
req.version (11);
req.target ("/");
req.body () = ostream.str ();
req.prepare_payload ();
http::async_write (socket, req, yield);
http::async_read (socket, buffer, res, yield);
--send_calls_remaining;
// Gracefully close the socket
boost::system::error_code ec;
socket.shutdown (tcp::socket::shutdown_both, ec);
debug_assert (!ec || ec == boost::system::errc::not_connected);
}
}
boost::property_tree::ptree rpc_request (boost::property_tree::ptree const & request, boost::asio::io_context & ioc, tcp::resolver::results_type const & results)
{
debug_assert (results.size () == 1);
std::promise<boost::optional<boost::property_tree::ptree>> promise;
auto rpc_session (std::make_shared<rpc_session> (request, ioc, results, [&promise](auto const & response_a) {
promise.set_value (response_a);
}));
rpc_session->run ();
boost::asio::spawn (boost::asio::io_context::strand (ioc), [&ioc, &results, request, &promise](boost::asio::yield_context yield) {
socket_type socket (ioc);
boost::beast::flat_buffer buffer;
http::request<http::string_body> req;
http::response<http::string_body> res;
boost::asio::async_connect (socket, results.cbegin (), results.cend (), yield);
std::stringstream ostream;
boost::property_tree::write_json (ostream, request);
req.method (http::verb::post);
req.version (11);
req.target ("/");
req.body () = ostream.str ();
req.prepare_payload ();
http::async_write (socket, req, yield);
http::async_read (socket, buffer, res, yield);
boost::property_tree::ptree json;
std::stringstream body (res.body ());
boost::property_tree::read_json (body, json);
promise.set_value (json);
});
auto future = promise.get_future ();
if (future.wait_for (std::chrono::seconds (5)) != std::future_status::ready)
{
@ -601,7 +456,10 @@ int main (int argc, char * const * argv)
destination_account = &destination_accounts[random_account_index];
}
std::make_shared<send_session> (ioc, send_calls_remaining, wallet, nano::genesis_account.to_account (), destination_account->as_string, primary_node_results)->run ();
// Send from genesis account to different accounts and receive the funds
boost::asio::spawn (boost::asio::io_context::strand (ioc), [&ioc, &primary_node_results, &wallet, &resolver, &node_count, destination_account, &send_calls_remaining](boost::asio::yield_context yield) {
send_receive (ioc, wallet, nano::genesis_account.to_account (), destination_account->as_string, send_calls_remaining, primary_node_results, yield);
});
}
while (send_calls_remaining != 0)

View file

@ -2,7 +2,7 @@ add_executable (rpc_test
entry.cpp
rpc.cpp)
target_link_libraries(rpc_test node rpc test_common gtest)
target_link_libraries(rpc_test node rpc test_common gtest Boost::coroutine)
target_compile_definitions(rpc_test
PUBLIC

View file

@ -1,3 +1,4 @@
#include <nano/boost/asio/spawn.hpp>
#include <nano/boost/beast/core/flat_buffer.hpp>
#include <nano/boost/beast/http.hpp>
#include <nano/lib/rpcconfig.hpp>
@ -25,24 +26,27 @@ namespace
class test_response
{
public:
test_response (boost::property_tree::ptree const & request_a, boost::asio::io_context & io_ctx) :
request (request_a),
sock (io_ctx)
test_response (boost::property_tree::ptree const & request_a, boost::asio::io_context & io_ctx_a) :
request (request_a)
{
}
test_response (boost::property_tree::ptree const & request_a, uint16_t port, boost::asio::io_context & io_ctx) :
request (request_a),
sock (io_ctx)
test_response (boost::property_tree::ptree const & request_a, uint16_t port_a, boost::asio::io_context & io_ctx_a) :
request (request_a)
{
run (port);
run (port_a, io_ctx_a);
}
void run (uint16_t port)
void run (uint16_t port_a, boost::asio::io_context & io_ctx_a)
{
sock.async_connect (nano::tcp_endpoint (boost::asio::ip::address_v6::loopback (), port), [this](boost::system::error_code const & ec) {
if (!ec)
boost::asio::spawn (io_ctx_a, [this, &io_ctx_a, port_a](boost::asio::yield_context yield) {
boost::asio::ip::tcp::socket sock (io_ctx_a);
boost::beast::flat_buffer sb;
boost::beast::http::request<boost::beast::http::string_body> req;
try
{
sock.async_connect (nano::tcp_endpoint (boost::asio::ip::address_v6::loopback (), port_a), yield);
std::stringstream ostream;
boost::property_tree::write_json (ostream, request);
req.method (boost::beast::http::verb::post);
@ -51,46 +55,27 @@ public:
ostream.flush ();
req.body () = ostream.str ();
req.prepare_payload ();
boost::beast::http::async_write (sock, req, [this](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
{
boost::beast::http::async_read (sock, sb, resp, [this](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
{
std::stringstream body (resp.body ());
try
{
boost::property_tree::read_json (body, json);
status = 200;
}
catch (std::exception &)
{
status = 500;
}
}
else
{
status = 400;
};
});
}
else
{
status = 600;
}
});
boost::beast::http::async_write (sock, req, yield);
boost::beast::http::async_read (sock, sb, resp, yield);
std::stringstream body (resp.body ());
try
{
boost::property_tree::read_json (body, json);
status = 200;
}
catch (std::exception const &)
{
status = 400;
}
}
else
catch (boost::system::error_code const &)
{
status = 400;
}
});
}
boost::property_tree::ptree const & request;
boost::asio::ip::tcp::socket sock;
boost::property_tree::ptree json;
boost::beast::flat_buffer sb;
boost::beast::http::request<boost::beast::http::string_body> req;
boost::beast::http::response<boost::beast::http::string_body> resp;
std::atomic<int> status{ 0 };
};
@ -6993,8 +6978,8 @@ TEST (rpc, simultaneous_calls)
std::atomic<int> count{ num };
for (int i = 0; i < num; ++i)
{
std::thread ([&test_responses, &promise, &count, i, port = rpc.config.port]() {
test_responses[i]->run (port);
std::thread ([&test_responses, &promise, &count, i, port = rpc.config.port, &io_ctx = system.io_ctx]() {
test_responses[i]->run (port, io_ctx);
if (--count == 0)
{
promise.set_value ();