Move rpc_request coroutine implementation to async callback style
This commit is contained in:
parent
8c2e46bc49
commit
11b20a7667
1 changed files with 104 additions and 35 deletions
|
|
@ -2,6 +2,7 @@
|
||||||
#include <nano/boost/asio/connect.hpp>
|
#include <nano/boost/asio/connect.hpp>
|
||||||
#include <nano/boost/asio/ip/tcp.hpp>
|
#include <nano/boost/asio/ip/tcp.hpp>
|
||||||
#include <nano/boost/asio/spawn.hpp>
|
#include <nano/boost/asio/spawn.hpp>
|
||||||
|
#include <nano/boost/asio/strand.hpp>
|
||||||
#include <nano/boost/beast/core/flat_buffer.hpp>
|
#include <nano/boost/beast/core/flat_buffer.hpp>
|
||||||
#include <nano/boost/beast/http.hpp>
|
#include <nano/boost/beast/http.hpp>
|
||||||
#include <nano/boost/process/child.hpp>
|
#include <nano/boost/process/child.hpp>
|
||||||
|
|
@ -18,6 +19,7 @@
|
||||||
#include <csignal>
|
#include <csignal>
|
||||||
#include <future>
|
#include <future>
|
||||||
#include <iomanip>
|
#include <iomanip>
|
||||||
|
#include <memory>
|
||||||
#include <random>
|
#include <random>
|
||||||
|
|
||||||
/* Boost v1.70 introduced breaking changes; the conditional compilation allows 1.6x to be supported as well. */
|
/* Boost v1.70 introduced breaking changes; the conditional compilation allows 1.6x to be supported as well. */
|
||||||
|
|
@ -90,6 +92,102 @@ public:
|
||||||
bool error{ false };
|
bool error{ false };
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class rpc_request_impl : public std::enable_shared_from_this<rpc_request_impl>
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
boost::property_tree::ptree const request;
|
||||||
|
boost::asio::io_context & ioc;
|
||||||
|
tcp::resolver::results_type const results;
|
||||||
|
socket_type socket;
|
||||||
|
|
||||||
|
boost::beast::flat_buffer buffer;
|
||||||
|
http::request<http::string_body> req;
|
||||||
|
http::response<http::string_body> res;
|
||||||
|
|
||||||
|
std::promise<boost::optional<boost::property_tree::ptree>> promise;
|
||||||
|
|
||||||
|
public:
|
||||||
|
rpc_request_impl (
|
||||||
|
boost::property_tree::ptree const & request_a,
|
||||||
|
boost::asio::io_context & ioc_a,
|
||||||
|
tcp::resolver::results_type const & results_a) :
|
||||||
|
request{ request_a },
|
||||||
|
ioc{ ioc_a },
|
||||||
|
results{ results_a },
|
||||||
|
socket{ ioc }
|
||||||
|
{
|
||||||
|
debug_assert (results.size () == 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
void start ()
|
||||||
|
{
|
||||||
|
async_connect ();
|
||||||
|
}
|
||||||
|
|
||||||
|
boost::property_tree::ptree value_get ()
|
||||||
|
{
|
||||||
|
auto future = promise.get_future ();
|
||||||
|
if (future.wait_for (std::chrono::seconds (5)) != std::future_status::ready)
|
||||||
|
{
|
||||||
|
throw std::runtime_error ("RPC request timed out");
|
||||||
|
}
|
||||||
|
auto response = future.get ();
|
||||||
|
debug_assert (response.is_initialized ());
|
||||||
|
return response.value_or (decltype (response)::argument_type{});
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
void async_connect ()
|
||||||
|
{
|
||||||
|
boost::asio::async_connect (socket, results.cbegin (), results.cend (),
|
||||||
|
[this_l = shared_from_this ()] (boost::system::error_code const & ec, tcp::resolver::iterator iterator) {
|
||||||
|
this_l->request_do ();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void request_do ()
|
||||||
|
{
|
||||||
|
std::stringstream ostream;
|
||||||
|
boost::property_tree::write_json (ostream, request);
|
||||||
|
|
||||||
|
req.method (http::verb::post);
|
||||||
|
req.version (11);
|
||||||
|
req.target ("/");
|
||||||
|
req.body () = ostream.str ();
|
||||||
|
req.prepare_payload ();
|
||||||
|
|
||||||
|
async_write ();
|
||||||
|
}
|
||||||
|
|
||||||
|
void async_write ()
|
||||||
|
{
|
||||||
|
http::async_write (socket, req,
|
||||||
|
[this_l = shared_from_this ()] (boost::system::error_code const & error_code, std::size_t bytes_transferred) {
|
||||||
|
debug_assert (!error_code);
|
||||||
|
debug_assert (bytes_transferred > 0);
|
||||||
|
this_l->async_read ();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void async_read ()
|
||||||
|
{
|
||||||
|
http::async_read (socket, buffer, res,
|
||||||
|
[this_l = shared_from_this ()] (boost::system::error_code const & error_code, std::size_t bytes_transferred) {
|
||||||
|
debug_assert (!error_code);
|
||||||
|
debug_assert (bytes_transferred > 0);
|
||||||
|
this_l->value_set ();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void value_set ()
|
||||||
|
{
|
||||||
|
boost::property_tree::ptree json;
|
||||||
|
std::stringstream body (res.body ());
|
||||||
|
boost::property_tree::read_json (body, json);
|
||||||
|
promise.set_value (json);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
void send_receive (boost::asio::io_context & io_ctx, std::string const & wallet, std::string const & source, std::string const & destination, std::atomic<int> & send_calls_remaining, tcp::resolver::results_type const & results, boost::asio::yield_context yield)
|
void send_receive (boost::asio::io_context & io_ctx, std::string const & wallet, std::string const & source, std::string const & destination, std::atomic<int> & send_calls_remaining, tcp::resolver::results_type const & results, boost::asio::yield_context yield)
|
||||||
{
|
{
|
||||||
boost::beast::flat_buffer buffer;
|
boost::beast::flat_buffer buffer;
|
||||||
|
|
@ -161,42 +259,13 @@ void send_receive (boost::asio::io_context & io_ctx, std::string const & wallet,
|
||||||
|
|
||||||
boost::property_tree::ptree rpc_request (boost::property_tree::ptree const & request, boost::asio::io_context & ioc, tcp::resolver::results_type const & results)
|
boost::property_tree::ptree rpc_request (boost::property_tree::ptree const & request, boost::asio::io_context & ioc, tcp::resolver::results_type const & results)
|
||||||
{
|
{
|
||||||
debug_assert (results.size () == 1);
|
auto rpc_request = std::make_shared<rpc_request_impl> (request, ioc, results);
|
||||||
|
boost::asio::strand<boost::asio::io_context::executor_type> strand{ ioc.get_executor () };
|
||||||
std::promise<boost::optional<boost::property_tree::ptree>> promise;
|
boost::asio::post (strand,
|
||||||
boost::asio::spawn (ioc, [&ioc, &results, request, &promise] (boost::asio::yield_context yield) {
|
[rpc_request] () {
|
||||||
socket_type socket (ioc);
|
rpc_request->start ();
|
||||||
boost::beast::flat_buffer buffer;
|
|
||||||
http::request<http::string_body> req;
|
|
||||||
http::response<http::string_body> res;
|
|
||||||
|
|
||||||
boost::asio::async_connect (socket, results.cbegin (), results.cend (), yield);
|
|
||||||
std::stringstream ostream;
|
|
||||||
boost::property_tree::write_json (ostream, request);
|
|
||||||
|
|
||||||
req.method (http::verb::post);
|
|
||||||
req.version (11);
|
|
||||||
req.target ("/");
|
|
||||||
req.body () = ostream.str ();
|
|
||||||
req.prepare_payload ();
|
|
||||||
|
|
||||||
http::async_write (socket, req, yield);
|
|
||||||
http::async_read (socket, buffer, res, yield);
|
|
||||||
|
|
||||||
boost::property_tree::ptree json;
|
|
||||||
std::stringstream body (res.body ());
|
|
||||||
boost::property_tree::read_json (body, json);
|
|
||||||
promise.set_value (json);
|
|
||||||
});
|
});
|
||||||
|
return rpc_request->value_get ();
|
||||||
auto future = promise.get_future ();
|
|
||||||
if (future.wait_for (std::chrono::seconds (5)) != std::future_status::ready)
|
|
||||||
{
|
|
||||||
throw std::runtime_error ("RPC request timed out");
|
|
||||||
}
|
|
||||||
auto response = future.get ();
|
|
||||||
debug_assert (response.is_initialized ());
|
|
||||||
return response.value_or (decltype (response)::argument_type{});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void keepalive_rpc (boost::asio::io_context & ioc, tcp::resolver::results_type const & results, uint16_t port)
|
void keepalive_rpc (boost::asio::io_context & ioc, tcp::resolver::results_type const & results, uint16_t port)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue