Use strands with ipc client & rpc server (#2140)

This commit is contained in:
Wesley Shillingford 2019-07-09 07:52:36 +01:00 committed by GitHub
commit 7b792a2caa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 66 additions and 36 deletions

View file

@ -253,6 +253,11 @@ public:
code = code_a;
}
error (boost::system::error_code code_a)
{
code = std::make_error_code (static_cast<std::errc> (code_a.value ()));
}
error (std::string message_a)
{
code = nano::error_common::generic;

View file

@ -14,13 +14,20 @@ public:
virtual void async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(boost::system::error_code const &, size_t)> callback_a) = 0;
};
/* Boost v1.70 introduced breaking changes; the conditional compilation allows 1.6x to be supported as well. */
#if BOOST_VERSION < 107000
using socket_type = boost::asio::ip::tcp::socket;
#else
using socket_type = boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::executor_type>;
#endif
/** Domain and TCP client socket */
template <typename SOCKET_TYPE, typename ENDPOINT_TYPE>
class socket_client : public nano::ipc::socket_base, public channel
{
public:
socket_client (boost::asio::io_context & io_ctx_a, ENDPOINT_TYPE endpoint_a) :
socket_base (io_ctx_a), endpoint (endpoint_a), socket (io_ctx_a), resolver (io_ctx_a)
socket_base (io_ctx_a), endpoint (endpoint_a), socket (io_ctx_a), resolver (io_ctx_a), strand (io_ctx_a.get_executor ())
{
}
@ -45,29 +52,29 @@ public:
void async_connect (std::function<void(boost::system::error_code const &)> callback_a)
{
this->timer_start (io_timeout);
socket.async_connect (endpoint, [this, callback_a](boost::system::error_code const & ec) {
socket.async_connect (endpoint, boost::asio::bind_executor (strand, [this, callback_a](boost::system::error_code const & ec) {
this->timer_cancel ();
callback_a (ec);
});
}));
}
void async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, size_t size_a, std::function<void(boost::system::error_code const &, size_t)> callback_a) override
{
this->timer_start (io_timeout);
buffer_a->resize (size_a);
boost::asio::async_read (socket, boost::asio::buffer (buffer_a->data (), size_a), [this, callback_a](boost::system::error_code const & ec, size_t size_a) {
boost::asio::async_read (socket, boost::asio::buffer (buffer_a->data (), size_a), boost::asio::bind_executor (this->strand, [this, callback_a](boost::system::error_code const & ec, size_t size_a) {
this->timer_cancel ();
callback_a (ec, size_a);
});
}));
}
void async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(boost::system::error_code const &, size_t)> callback_a) override
{
this->timer_start (io_timeout);
boost::asio::async_write (socket, boost::asio::buffer (buffer_a->data (), buffer_a->size ()), [this, callback_a, buffer_a](boost::system::error_code const & ec, size_t size_a) {
boost::asio::async_write (socket, boost::asio::buffer (buffer_a->data (), buffer_a->size ()), boost::asio::bind_executor (this->strand, [this, callback_a, buffer_a](boost::system::error_code const & ec, size_t size_a) {
this->timer_cancel ();
callback_a (ec, size_a);
});
}));
}
/** Shut down and close socket */
@ -82,6 +89,7 @@ private:
SOCKET_TYPE socket;
boost::asio::ip::tcp::resolver resolver;
std::chrono::seconds io_timeout{ 60 };
boost::asio::strand<boost::asio::io_context::executor_type> strand;
};
/**
@ -98,7 +106,7 @@ public:
void connect (std::string const & host_a, uint16_t port_a, std::function<void(nano::error)> callback_a)
{
tcp_client = std::make_shared<socket_client<boost::asio::ip::tcp::socket, boost::asio::ip::tcp::endpoint>> (io_ctx, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v6 (), port_a));
tcp_client = std::make_shared<socket_client<socket_type, boost::asio::ip::tcp::endpoint>> (io_ctx, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v6 (), port_a));
tcp_client->async_resolve (host_a, port_a, [this, callback_a](boost::system::error_code const & ec_resolve_a, boost::asio::ip::tcp::endpoint endpoint_a) {
if (!ec_resolve_a)
@ -136,7 +144,7 @@ public:
private:
boost::asio::io_context & io_ctx;
std::shared_ptr<socket_client<boost::asio::ip::tcp::socket, boost::asio::ip::tcp::endpoint>> tcp_client;
std::shared_ptr<socket_client<socket_type, boost::asio::ip::tcp::endpoint>> tcp_client;
#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
std::shared_ptr<socket_client<boost::asio::local::stream_protocol::socket, boost::asio::local::stream_protocol::endpoint>> domain_client;
#endif

View file

@ -46,7 +46,7 @@ void nano::rpc::start ()
void nano::rpc::accept ()
{
auto connection (std::make_shared<nano::rpc_connection> (config, io_ctx, logger, rpc_handler_interface));
acceptor.async_accept (connection->socket, [this, connection](boost::system::error_code const & ec) {
acceptor.async_accept (connection->socket, boost::asio::bind_executor (connection->strand, [this, connection](boost::system::error_code const & ec) {
if (ec != boost::asio::error::operation_aborted && acceptor.is_open ())
{
accept ();
@ -59,7 +59,7 @@ void nano::rpc::accept ()
{
logger.always_log (boost::str (boost::format ("Error accepting RPC connections: %1% (%2%)") % ec.message () % ec.value ()));
}
});
}));
}
void nano::rpc::stop ()

View file

@ -11,6 +11,7 @@
nano::rpc_connection::rpc_connection (nano::rpc_config const & rpc_config, boost::asio::io_context & io_ctx, nano::logger_mt & logger, nano::rpc_handler_interface & rpc_handler_interface) :
socket (io_ctx),
strand (io_ctx.get_executor ()),
io_ctx (io_ctx),
logger (logger),
rpc_config (rpc_config),
@ -56,7 +57,7 @@ void nano::rpc_connection::read ()
auto this_l (shared_from_this ());
auto header_parser (std::make_shared<boost::beast::http::request_parser<boost::beast::http::empty_body>> ());
header_parser->body_limit (rpc_config.max_request_size);
boost::beast::http::async_read_header (socket, buffer, *header_parser, [this_l, header_parser](boost::system::error_code const & ec, size_t bytes_transferred) {
boost::beast::http::async_read_header (socket, buffer, *header_parser, boost::asio::bind_executor (strand, [this_l, header_parser](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
{
if (boost::iequals (header_parser->get ()[boost::beast::http::field::expect], "100-continue"))
@ -65,7 +66,7 @@ void nano::rpc_connection::read ()
continue_response->version (11);
continue_response->result (boost::beast::http::status::continue_);
continue_response->set (boost::beast::http::field::server, "nano");
boost::beast::http::async_write (this_l->socket, *continue_response, [this_l, continue_response](boost::system::error_code const & ec, size_t bytes_transferred) {});
boost::beast::http::async_write (this_l->socket, *continue_response, boost::asio::bind_executor (this_l->strand, [this_l, continue_response](boost::system::error_code const & ec, size_t bytes_transferred) {}));
}
this_l->parse_request (header_parser);
@ -77,36 +78,39 @@ void nano::rpc_connection::read ()
// Respond with the reason for the invalid header
auto response_handler ([this_l](std::string const & tree_a) {
this_l->write_result (tree_a, 11);
boost::beast::http::async_write (this_l->socket, this_l->res, [this_l](boost::system::error_code const & ec, size_t bytes_transferred) {
boost::beast::http::async_write (this_l->socket, this_l->res, boost::asio::bind_executor (this_l->strand, [this_l](boost::system::error_code const & ec, size_t bytes_transferred) {
this_l->write_completion_handler (this_l);
});
}));
});
json_error_response (response_handler, std::string ("Invalid header: ") + ec.message ());
}
});
}));
}
void nano::rpc_connection::parse_request (std::shared_ptr<boost::beast::http::request_parser<boost::beast::http::empty_body>> header_parser)
{
auto this_l (shared_from_this ());
auto body_parser (std::make_shared<boost::beast::http::request_parser<boost::beast::http::string_body>> (std::move (*header_parser)));
boost::beast::http::async_read (socket, buffer, *body_parser, [this_l, body_parser](boost::system::error_code const & ec, size_t bytes_transferred) {
boost::beast::http::async_read (socket, buffer, *body_parser, boost::asio::bind_executor (strand, [this_l, body_parser](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
{
// equivalent to background
this_l->io_ctx.post ([this_l, body_parser]() {
auto & req (body_parser->get ());
auto start (std::chrono::steady_clock::now ());
auto version (req.version ());
std::string request_id (boost::str (boost::format ("%1%") % boost::io::group (std::hex, std::showbase, reinterpret_cast<uintptr_t> (this_l.get ()))));
std::stringstream ss;
ss << std::hex << std::showbase << reinterpret_cast<uintptr_t> (this_l.get ());
auto request_id = ss.str ();
auto response_handler ([this_l, version, start, request_id](std::string const & tree_a) {
auto body = tree_a;
this_l->write_result (body, version);
boost::beast::http::async_write (this_l->socket, this_l->res, [this_l](boost::system::error_code const & ec, size_t bytes_transferred) {
boost::beast::http::async_write (this_l->socket, this_l->res, boost::asio::bind_executor (this_l->strand, [this_l](boost::system::error_code const & ec, size_t bytes_transferred) {
this_l->write_completion_handler (this_l);
});
}));
this_l->logger.always_log (boost::str (boost::format ("RPC request %2% completed in: %1% microseconds") % std::chrono::duration_cast<std::chrono::microseconds> (std::chrono::steady_clock::now () - start).count () % request_id));
std::stringstream ss;
ss << "RPC request " << request_id << " completed in: " << std::chrono::duration_cast<std::chrono::microseconds> (std::chrono::steady_clock::now () - start).count () << " microseconds";
this_l->logger.always_log (ss.str ().c_str ());
});
auto method = req.method ();
switch (method)
@ -121,9 +125,9 @@ void nano::rpc_connection::parse_request (std::shared_ptr<boost::beast::http::re
{
this_l->prepare_head (version);
this_l->res.prepare_payload ();
boost::beast::http::async_write (this_l->socket, this_l->res, [this_l](boost::system::error_code const & ec, size_t bytes_transferred) {
boost::beast::http::async_write (this_l->socket, this_l->res, boost::asio::bind_executor (this_l->strand, [this_l](boost::system::error_code const & ec, size_t bytes_transferred) {
this_l->write_completion_handler (this_l);
});
}));
break;
}
default:
@ -138,7 +142,7 @@ void nano::rpc_connection::parse_request (std::shared_ptr<boost::beast::http::re
{
this_l->logger.always_log ("RPC read error: ", ec.message ());
}
});
}));
}
void nano::rpc_connection::write_completion_handler (std::shared_ptr<nano::rpc_connection> rpc_connection)

View file

@ -2,10 +2,18 @@
#include <nano/rpc/rpc_handler.hpp>
#include <boost/asio/strand.hpp>
#include <boost/beast.hpp>
#include <atomic>
/* Boost v1.70 introduced breaking changes; the conditional compilation allows 1.6x to be supported as well. */
#if BOOST_VERSION < 107000
using socket_type = boost::asio::ip::tcp::socket;
#else
using socket_type = boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::executor_type>;
#endif
namespace nano
{
class logger_mt;
@ -25,9 +33,10 @@ public:
void read ();
boost::asio::ip::tcp::socket socket;
socket_type socket;
boost::beast::flat_buffer buffer;
boost::beast::http::response<boost::beast::http::string_body> res;
boost::asio::strand<boost::asio::io_context::executor_type> strand;
std::atomic_flag responded;
boost::asio::io_context & io_ctx;
nano::logger_mt & logger;

View file

@ -40,7 +40,7 @@ void nano::rpc_connection_secure::handle_handshake (const boost::system::error_c
void nano::rpc_connection_secure::write_completion_handler (std::shared_ptr<nano::rpc_connection> rpc)
{
auto rpc_connection_secure = boost::polymorphic_pointer_downcast<nano::rpc_connection_secure> (rpc);
rpc_connection_secure->stream.async_shutdown ([rpc_connection_secure](auto const & ec_shutdown) {
rpc_connection_secure->stream.async_shutdown (boost::asio::bind_executor (rpc->strand, [rpc_connection_secure](auto const & ec_shutdown) {
rpc_connection_secure->on_shutdown (ec_shutdown);
});
}));
}

View file

@ -22,6 +22,6 @@ public:
void on_shutdown (const boost::system::error_code & error);
private:
boost::asio::ssl::stream<boost::asio::ip::tcp::socket &> stream;
boost::asio::ssl::stream<socket_type &> stream;
};
}

View file

@ -50,14 +50,19 @@ void nano::rpc_handler::process_request ()
json_error_response (response, "Max JSON depth exceeded");
}
else
{
boost::property_tree::ptree request;
{
std::stringstream ss;
ss << body;
boost::property_tree::ptree request;
boost::property_tree::read_json (ss, request);
}
auto action = request.get<std::string> ("action");
logger.always_log (boost::str (boost::format ("%1% ") % request_id), filter_request (request));
// Creating same string via stringstream as using it directly is generating a TSAN warning
std::stringstream ss;
ss << request_id;
logger.always_log (ss.str (), " ", filter_request (request));
// Check if this is a RPC command which requires RPC enabled control
std::error_code rpc_control_disabled_ec = nano::error_rpc::rpc_control_disabled;

View file

@ -17,7 +17,6 @@ class rpc_handler : public std::enable_shared_from_this<nano::rpc_handler>
public:
rpc_handler (nano::rpc_config const & rpc_config, std::string const & body_a, std::string const & request_id_a, std::function<void(std::string const &)> const & response_a, nano::rpc_handler_interface & rpc_handler_interface_a, nano::logger_mt & logger);
void process_request ();
void read (std::shared_ptr<std::vector<uint8_t>> req, std::shared_ptr<std::vector<uint8_t>> res, const std::string & action);
private:
std::string body;

View file

@ -101,7 +101,7 @@ ssl_context (boost::asio::ssl::context::tlsv12_server)
void nano::rpc_secure::accept ()
{
auto connection (std::make_shared<nano::rpc_connection_secure> (config, io_ctx, logger, rpc_handler_interface, this->ssl_context));
acceptor.async_accept (connection->socket, [this, connection](boost::system::error_code const & ec) {
acceptor.async_accept (connection->socket, boost::asio::bind_executor (connection->strand, [this, connection](boost::system::error_code const & ec) {
if (acceptor.is_open ())
{
accept ();
@ -114,5 +114,5 @@ void nano::rpc_secure::accept ()
{
logger.always_log (boost::str (boost::format ("Error accepting RPC connections: %1%") % ec));
}
});
}));
}