Move send_receive coroutine implementation to async callback style
This commit is contained in:
		
					parent
					
						
							
								11b20a7667
							
						
					
				
			
			
				commit
				
					
						ec6af1335f
					
				
			
		
					 1 changed files with 221 additions and 74 deletions
				
			
		| 
						 | 
				
			
			@ -1,7 +1,5 @@
 | 
			
		|||
#include <nano/boost/asio/bind_executor.hpp>
 | 
			
		||||
#include <nano/boost/asio/connect.hpp>
 | 
			
		||||
#include <nano/boost/asio/ip/tcp.hpp>
 | 
			
		||||
#include <nano/boost/asio/spawn.hpp>
 | 
			
		||||
#include <nano/boost/asio/strand.hpp>
 | 
			
		||||
#include <nano/boost/beast/core/flat_buffer.hpp>
 | 
			
		||||
#include <nano/boost/beast/http.hpp>
 | 
			
		||||
| 
						 | 
				
			
			@ -92,6 +90,222 @@ public:
 | 
			
		|||
	bool error{ false };
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
class send_receive_impl;
 | 
			
		||||
class start_receive_session_impl;
 | 
			
		||||
class rpc_request_impl;
 | 
			
		||||
 | 
			
		||||
class start_receive_session_impl : public std::enable_shared_from_this<start_receive_session_impl>
 | 
			
		||||
{
 | 
			
		||||
private:
 | 
			
		||||
	socket_type socket;
 | 
			
		||||
	std::atomic<int> & send_calls_remaining;
 | 
			
		||||
	tcp::resolver::results_type const & results;
 | 
			
		||||
 | 
			
		||||
	std::string const wallet;
 | 
			
		||||
	std::string const source;
 | 
			
		||||
	std::string const destination;
 | 
			
		||||
 | 
			
		||||
	std::string const block;
 | 
			
		||||
 | 
			
		||||
	boost::beast::flat_buffer buffer;
 | 
			
		||||
	http::request<http::string_body> req;
 | 
			
		||||
	http::response<http::string_body> res;
 | 
			
		||||
 | 
			
		||||
public:
 | 
			
		||||
	start_receive_session_impl (
 | 
			
		||||
	boost::asio::io_context & io_ctx_a,
 | 
			
		||||
	tcp::resolver::results_type const & results_a,
 | 
			
		||||
	std::string const & wallet_a,
 | 
			
		||||
	std::string const & source_a,
 | 
			
		||||
	std::string const & destination_a,
 | 
			
		||||
	std::atomic<int> & send_calls_remaining_a,
 | 
			
		||||
	std::string const block_a) :
 | 
			
		||||
		socket{ io_ctx_a },
 | 
			
		||||
		send_calls_remaining{ send_calls_remaining_a },
 | 
			
		||||
		results{ results_a },
 | 
			
		||||
		wallet{ wallet_a },
 | 
			
		||||
		source{ source_a },
 | 
			
		||||
		destination{ destination_a },
 | 
			
		||||
		block{ std::move (block_a) }
 | 
			
		||||
	{
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	void start ()
 | 
			
		||||
	{
 | 
			
		||||
		async_connect ();
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
private:
 | 
			
		||||
	void async_connect ()
 | 
			
		||||
	{
 | 
			
		||||
		boost::asio::async_connect (socket, results.cbegin (), results.cend (),
 | 
			
		||||
		[this_l = shared_from_this ()] (boost::system::error_code const & ec, tcp::resolver::iterator iterator) {
 | 
			
		||||
			this_l->request_receive ();
 | 
			
		||||
		});
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	void request_receive ()
 | 
			
		||||
	{
 | 
			
		||||
		boost::property_tree::ptree request;
 | 
			
		||||
		request.put ("action", "receive");
 | 
			
		||||
		request.put ("wallet", wallet);
 | 
			
		||||
		request.put ("account", destination);
 | 
			
		||||
		request.put ("block", block);
 | 
			
		||||
		std::stringstream ostream;
 | 
			
		||||
		boost::property_tree::write_json (ostream, request);
 | 
			
		||||
 | 
			
		||||
		req.method (http::verb::post);
 | 
			
		||||
		req.version (11);
 | 
			
		||||
		req.target ("/");
 | 
			
		||||
		req.body () = ostream.str ();
 | 
			
		||||
		req.prepare_payload ();
 | 
			
		||||
 | 
			
		||||
		async_write ();
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	void async_write ()
 | 
			
		||||
	{
 | 
			
		||||
		http::async_write (socket, req,
 | 
			
		||||
		[this_l = shared_from_this ()] (boost::system::error_code const & error_code, std::size_t bytes_transferred) {
 | 
			
		||||
			debug_assert (!error_code);
 | 
			
		||||
			debug_assert (bytes_transferred > 0);
 | 
			
		||||
			this_l->async_read ();
 | 
			
		||||
		});
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	void async_read ()
 | 
			
		||||
	{
 | 
			
		||||
		http::async_read (socket, buffer, res,
 | 
			
		||||
		[this_l = shared_from_this ()] (boost::system::error_code const & error_code, std::size_t bytes_transferred) {
 | 
			
		||||
			debug_assert (!error_code);
 | 
			
		||||
			debug_assert (bytes_transferred > 0);
 | 
			
		||||
			--this_l->send_calls_remaining;
 | 
			
		||||
			this_l->socket_shutdown ();
 | 
			
		||||
		});
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	void socket_shutdown ()
 | 
			
		||||
	{
 | 
			
		||||
		// Gracefully close the socket
 | 
			
		||||
		boost::system::error_code ec;
 | 
			
		||||
		socket.shutdown (tcp::socket::shutdown_both, ec);
 | 
			
		||||
		debug_assert (!ec || ec == boost::system::errc::not_connected);
 | 
			
		||||
	}
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
class send_receive_impl : public std::enable_shared_from_this<send_receive_impl>
 | 
			
		||||
{
 | 
			
		||||
private:
 | 
			
		||||
	boost::asio::io_context & io_ctx;
 | 
			
		||||
	socket_type socket;
 | 
			
		||||
 | 
			
		||||
	std::string const wallet;
 | 
			
		||||
	std::string const source;
 | 
			
		||||
	std::string const destination;
 | 
			
		||||
 | 
			
		||||
	std::atomic<int> & send_calls_remaining;
 | 
			
		||||
	tcp::resolver::results_type const results;
 | 
			
		||||
 | 
			
		||||
	boost::beast::flat_buffer buffer;
 | 
			
		||||
	http::request<http::string_body> req;
 | 
			
		||||
	http::response<http::string_body> res;
 | 
			
		||||
 | 
			
		||||
	std::shared_ptr<start_receive_session_impl> start_receive_session = nullptr;
 | 
			
		||||
 | 
			
		||||
public:
 | 
			
		||||
	send_receive_impl (
 | 
			
		||||
	boost::asio::io_context & io_ctx_a,
 | 
			
		||||
	std::string const & wallet_a,
 | 
			
		||||
	std::string const & source_a,
 | 
			
		||||
	std::string const & destination_a,
 | 
			
		||||
	std::atomic<int> & send_calls_remaining_a,
 | 
			
		||||
	tcp::resolver::results_type const & results_a) :
 | 
			
		||||
		io_ctx{ io_ctx_a },
 | 
			
		||||
		socket{ io_ctx },
 | 
			
		||||
		wallet{ wallet_a },
 | 
			
		||||
		source{ source_a },
 | 
			
		||||
		destination{ destination_a },
 | 
			
		||||
		send_calls_remaining{ send_calls_remaining_a },
 | 
			
		||||
		results{ results_a }
 | 
			
		||||
	{
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	void start ()
 | 
			
		||||
	{
 | 
			
		||||
		async_connect ();
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
private:
 | 
			
		||||
	void async_connect ()
 | 
			
		||||
	{
 | 
			
		||||
		boost::asio::async_connect (socket, results.cbegin (), results.cend (),
 | 
			
		||||
		[this_l = shared_from_this ()] (boost::system::error_code const & ec, tcp::resolver::iterator iterator) {
 | 
			
		||||
			this_l->request_send ();
 | 
			
		||||
		});
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	void request_send ()
 | 
			
		||||
	{
 | 
			
		||||
		boost::property_tree::ptree request;
 | 
			
		||||
		request.put ("action", "send");
 | 
			
		||||
		request.put ("wallet", wallet);
 | 
			
		||||
		request.put ("source", source);
 | 
			
		||||
		request.put ("destination", destination);
 | 
			
		||||
		request.put ("amount", "1");
 | 
			
		||||
		std::stringstream ostream;
 | 
			
		||||
		boost::property_tree::write_json (ostream, request);
 | 
			
		||||
 | 
			
		||||
		req.method (http::verb::post);
 | 
			
		||||
		req.version (11);
 | 
			
		||||
		req.target ("/");
 | 
			
		||||
		req.body () = ostream.str ();
 | 
			
		||||
		req.prepare_payload ();
 | 
			
		||||
 | 
			
		||||
		async_write ();
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	void async_write ()
 | 
			
		||||
	{
 | 
			
		||||
		http::async_write (socket, req,
 | 
			
		||||
		[this_l = shared_from_this ()] (boost::system::error_code const & error_code, std::size_t bytes_transferred) {
 | 
			
		||||
			debug_assert (!error_code);
 | 
			
		||||
			debug_assert (bytes_transferred > 0);
 | 
			
		||||
			this_l->async_read ();
 | 
			
		||||
		});
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	void async_read ()
 | 
			
		||||
	{
 | 
			
		||||
		http::async_read (socket, buffer, res,
 | 
			
		||||
		[this_l = shared_from_this ()] (boost::system::error_code const & error_code, std::size_t bytes_transferred) {
 | 
			
		||||
			debug_assert (!error_code);
 | 
			
		||||
			debug_assert (bytes_transferred > 0);
 | 
			
		||||
			this_l->receive_start ();
 | 
			
		||||
			this_l->socket_shutdown ();
 | 
			
		||||
		});
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	void socket_shutdown ()
 | 
			
		||||
	{
 | 
			
		||||
		// Shut down send socket
 | 
			
		||||
		boost::system::error_code ec;
 | 
			
		||||
		socket.shutdown (tcp::socket::shutdown_both, ec);
 | 
			
		||||
		debug_assert (!ec || ec == boost::system::errc::not_connected);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	void receive_start ()
 | 
			
		||||
	{
 | 
			
		||||
		boost::property_tree::ptree json;
 | 
			
		||||
		std::stringstream body (res.body ());
 | 
			
		||||
		boost::property_tree::read_json (body, json);
 | 
			
		||||
		auto block = json.get<std::string> ("block");
 | 
			
		||||
 | 
			
		||||
		start_receive_session = std::make_shared<start_receive_session_impl> (
 | 
			
		||||
		io_ctx, results, wallet, source, destination, send_calls_remaining, block);
 | 
			
		||||
		start_receive_session->start ();
 | 
			
		||||
	}
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
class rpc_request_impl : public std::enable_shared_from_this<rpc_request_impl>
 | 
			
		||||
{
 | 
			
		||||
private:
 | 
			
		||||
| 
						 | 
				
			
			@ -188,75 +402,6 @@ private:
 | 
			
		|||
	}
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
void send_receive (boost::asio::io_context & io_ctx, std::string const & wallet, std::string const & source, std::string const & destination, std::atomic<int> & send_calls_remaining, tcp::resolver::results_type const & results, boost::asio::yield_context yield)
 | 
			
		||||
{
 | 
			
		||||
	boost::beast::flat_buffer buffer;
 | 
			
		||||
	http::request<http::string_body> req;
 | 
			
		||||
	http::response<http::string_body> res;
 | 
			
		||||
	socket_type socket (io_ctx);
 | 
			
		||||
 | 
			
		||||
	boost::asio::async_connect (socket, results.cbegin (), results.cend (), yield);
 | 
			
		||||
 | 
			
		||||
	boost::property_tree::ptree request;
 | 
			
		||||
	request.put ("action", "send");
 | 
			
		||||
	request.put ("wallet", wallet);
 | 
			
		||||
	request.put ("source", source);
 | 
			
		||||
	request.put ("destination", destination);
 | 
			
		||||
	request.put ("amount", "1");
 | 
			
		||||
	std::stringstream ostream;
 | 
			
		||||
	boost::property_tree::write_json (ostream, request);
 | 
			
		||||
 | 
			
		||||
	req.method (http::verb::post);
 | 
			
		||||
	req.version (11);
 | 
			
		||||
	req.target ("/");
 | 
			
		||||
	req.body () = ostream.str ();
 | 
			
		||||
	req.prepare_payload ();
 | 
			
		||||
 | 
			
		||||
	http::async_write (socket, req, yield);
 | 
			
		||||
	http::async_read (socket, buffer, res, yield);
 | 
			
		||||
	boost::property_tree::ptree json;
 | 
			
		||||
	std::stringstream body (res.body ());
 | 
			
		||||
	boost::property_tree::read_json (body, json);
 | 
			
		||||
	auto block = json.get<std::string> ("block");
 | 
			
		||||
 | 
			
		||||
	// Shut down send socket
 | 
			
		||||
	boost::system::error_code ec;
 | 
			
		||||
	socket.shutdown (tcp::socket::shutdown_both, ec);
 | 
			
		||||
	debug_assert (!ec || ec == boost::system::errc::not_connected);
 | 
			
		||||
 | 
			
		||||
	{
 | 
			
		||||
		// Start receive session
 | 
			
		||||
		boost::beast::flat_buffer buffer;
 | 
			
		||||
		http::request<http::string_body> req;
 | 
			
		||||
		http::response<http::string_body> res1;
 | 
			
		||||
		socket_type socket (io_ctx);
 | 
			
		||||
 | 
			
		||||
		boost::asio::async_connect (socket, results.cbegin (), results.cend (), yield);
 | 
			
		||||
 | 
			
		||||
		boost::property_tree::ptree request;
 | 
			
		||||
		request.put ("action", "receive");
 | 
			
		||||
		request.put ("wallet", wallet);
 | 
			
		||||
		request.put ("account", destination);
 | 
			
		||||
		request.put ("block", block);
 | 
			
		||||
		std::stringstream ostream;
 | 
			
		||||
		boost::property_tree::write_json (ostream, request);
 | 
			
		||||
 | 
			
		||||
		req.method (http::verb::post);
 | 
			
		||||
		req.version (11);
 | 
			
		||||
		req.target ("/");
 | 
			
		||||
		req.body () = ostream.str ();
 | 
			
		||||
		req.prepare_payload ();
 | 
			
		||||
 | 
			
		||||
		http::async_write (socket, req, yield);
 | 
			
		||||
		http::async_read (socket, buffer, res, yield);
 | 
			
		||||
		--send_calls_remaining;
 | 
			
		||||
		// Gracefully close the socket
 | 
			
		||||
		boost::system::error_code ec;
 | 
			
		||||
		socket.shutdown (tcp::socket::shutdown_both, ec);
 | 
			
		||||
		debug_assert (!ec || ec == boost::system::errc::not_connected);
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
boost::property_tree::ptree rpc_request (boost::property_tree::ptree const & request, boost::asio::io_context & ioc, tcp::resolver::results_type const & results)
 | 
			
		||||
{
 | 
			
		||||
	auto rpc_request = std::make_shared<rpc_request_impl> (request, ioc, results);
 | 
			
		||||
| 
						 | 
				
			
			@ -490,7 +635,6 @@ int main (int argc, char * const * argv)
 | 
			
		|||
		std::uniform_int_distribution<size_t> dist (0, destination_accounts.size () - 1);
 | 
			
		||||
 | 
			
		||||
		std::atomic<int> send_calls_remaining{ send_count };
 | 
			
		||||
 | 
			
		||||
		for (auto i = 0; i < send_count; ++i)
 | 
			
		||||
		{
 | 
			
		||||
			account * destination_account;
 | 
			
		||||
| 
						 | 
				
			
			@ -505,8 +649,11 @@ int main (int argc, char * const * argv)
 | 
			
		|||
			}
 | 
			
		||||
 | 
			
		||||
			// Send from genesis account to different accounts and receive the funds
 | 
			
		||||
			boost::asio::spawn (ioc, [&ioc, &primary_node_results, &wallet, destination_account, &send_calls_remaining] (boost::asio::yield_context yield) {
 | 
			
		||||
				send_receive (ioc, wallet, nano::dev::genesis->account ().to_account (), destination_account->as_string, send_calls_remaining, primary_node_results, yield);
 | 
			
		||||
			auto send_receive = std::make_shared<send_receive_impl> (ioc, wallet, nano::dev::genesis->account ().to_account (), destination_account->as_string, send_calls_remaining, primary_node_results);
 | 
			
		||||
			boost::asio::strand<boost::asio::io_context::executor_type> strand{ ioc.get_executor () };
 | 
			
		||||
			boost::asio::post (strand,
 | 
			
		||||
			[send_receive] () {
 | 
			
		||||
				send_receive->start ();
 | 
			
		||||
			});
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue