diff --git a/nano/lib/errors.hpp b/nano/lib/errors.hpp index 6a26388e8..2eba2ca6c 100644 --- a/nano/lib/errors.hpp +++ b/nano/lib/errors.hpp @@ -253,6 +253,11 @@ public: code = code_a; } + error (boost::system::error_code code_a) + { + code = std::make_error_code (static_cast (code_a.value ())); + } + error (std::string message_a) { code = nano::error_common::generic; diff --git a/nano/lib/ipc_client.cpp b/nano/lib/ipc_client.cpp index b490081eb..4effd0088 100644 --- a/nano/lib/ipc_client.cpp +++ b/nano/lib/ipc_client.cpp @@ -14,13 +14,20 @@ public: virtual void async_write (std::shared_ptr> buffer_a, std::function callback_a) = 0; }; +/* Boost v1.70 introduced breaking changes; the conditional compilation allows 1.6x to be supported as well. */ +#if BOOST_VERSION < 107000 +using socket_type = boost::asio::ip::tcp::socket; +#else +using socket_type = boost::asio::basic_stream_socket; +#endif + /** Domain and TCP client socket */ template class socket_client : public nano::ipc::socket_base, public channel { public: socket_client (boost::asio::io_context & io_ctx_a, ENDPOINT_TYPE endpoint_a) : - socket_base (io_ctx_a), endpoint (endpoint_a), socket (io_ctx_a), resolver (io_ctx_a) + socket_base (io_ctx_a), endpoint (endpoint_a), socket (io_ctx_a), resolver (io_ctx_a), strand (io_ctx_a.get_executor ()) { } @@ -45,29 +52,29 @@ public: void async_connect (std::function callback_a) { this->timer_start (io_timeout); - socket.async_connect (endpoint, [this, callback_a](boost::system::error_code const & ec) { + socket.async_connect (endpoint, boost::asio::bind_executor (strand, [this, callback_a](boost::system::error_code const & ec) { this->timer_cancel (); callback_a (ec); - }); + })); } void async_read (std::shared_ptr> buffer_a, size_t size_a, std::function callback_a) override { this->timer_start (io_timeout); buffer_a->resize (size_a); - boost::asio::async_read (socket, boost::asio::buffer (buffer_a->data (), size_a), [this, callback_a](boost::system::error_code const & ec, size_t size_a) { + boost::asio::async_read (socket, boost::asio::buffer (buffer_a->data (), size_a), boost::asio::bind_executor (this->strand, [this, callback_a](boost::system::error_code const & ec, size_t size_a) { this->timer_cancel (); callback_a (ec, size_a); - }); + })); } void async_write (std::shared_ptr> buffer_a, std::function callback_a) override { this->timer_start (io_timeout); - boost::asio::async_write (socket, boost::asio::buffer (buffer_a->data (), buffer_a->size ()), [this, callback_a, buffer_a](boost::system::error_code const & ec, size_t size_a) { + boost::asio::async_write (socket, boost::asio::buffer (buffer_a->data (), buffer_a->size ()), boost::asio::bind_executor (this->strand, [this, callback_a, buffer_a](boost::system::error_code const & ec, size_t size_a) { this->timer_cancel (); callback_a (ec, size_a); - }); + })); } /** Shut down and close socket */ @@ -82,6 +89,7 @@ private: SOCKET_TYPE socket; boost::asio::ip::tcp::resolver resolver; std::chrono::seconds io_timeout{ 60 }; + boost::asio::strand strand; }; /** @@ -98,7 +106,7 @@ public: void connect (std::string const & host_a, uint16_t port_a, std::function callback_a) { - tcp_client = std::make_shared> (io_ctx, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v6 (), port_a)); + tcp_client = std::make_shared> (io_ctx, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v6 (), port_a)); tcp_client->async_resolve (host_a, port_a, [this, callback_a](boost::system::error_code const & ec_resolve_a, boost::asio::ip::tcp::endpoint endpoint_a) { if (!ec_resolve_a) @@ -136,7 +144,7 @@ public: private: boost::asio::io_context & io_ctx; - std::shared_ptr> tcp_client; + std::shared_ptr> tcp_client; #if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS) std::shared_ptr> domain_client; #endif diff --git a/nano/rpc/rpc.cpp b/nano/rpc/rpc.cpp index bc2df761b..e775ec747 100644 --- a/nano/rpc/rpc.cpp +++ b/nano/rpc/rpc.cpp @@ -46,7 +46,7 @@ void nano::rpc::start () void nano::rpc::accept () { auto connection (std::make_shared (config, io_ctx, logger, rpc_handler_interface)); - acceptor.async_accept (connection->socket, [this, connection](boost::system::error_code const & ec) { + acceptor.async_accept (connection->socket, boost::asio::bind_executor (connection->strand, [this, connection](boost::system::error_code const & ec) { if (ec != boost::asio::error::operation_aborted && acceptor.is_open ()) { accept (); @@ -59,7 +59,7 @@ void nano::rpc::accept () { logger.always_log (boost::str (boost::format ("Error accepting RPC connections: %1% (%2%)") % ec.message () % ec.value ())); } - }); + })); } void nano::rpc::stop () diff --git a/nano/rpc/rpc_connection.cpp b/nano/rpc/rpc_connection.cpp index 3ac9e954b..b85967003 100644 --- a/nano/rpc/rpc_connection.cpp +++ b/nano/rpc/rpc_connection.cpp @@ -11,6 +11,7 @@ nano::rpc_connection::rpc_connection (nano::rpc_config const & rpc_config, boost::asio::io_context & io_ctx, nano::logger_mt & logger, nano::rpc_handler_interface & rpc_handler_interface) : socket (io_ctx), +strand (io_ctx.get_executor ()), io_ctx (io_ctx), logger (logger), rpc_config (rpc_config), @@ -56,7 +57,7 @@ void nano::rpc_connection::read () auto this_l (shared_from_this ()); auto header_parser (std::make_shared> ()); header_parser->body_limit (rpc_config.max_request_size); - boost::beast::http::async_read_header (socket, buffer, *header_parser, [this_l, header_parser](boost::system::error_code const & ec, size_t bytes_transferred) { + boost::beast::http::async_read_header (socket, buffer, *header_parser, boost::asio::bind_executor (strand, [this_l, header_parser](boost::system::error_code const & ec, size_t bytes_transferred) { if (!ec) { if (boost::iequals (header_parser->get ()[boost::beast::http::field::expect], "100-continue")) @@ -65,7 +66,7 @@ void nano::rpc_connection::read () continue_response->version (11); continue_response->result (boost::beast::http::status::continue_); continue_response->set (boost::beast::http::field::server, "nano"); - boost::beast::http::async_write (this_l->socket, *continue_response, [this_l, continue_response](boost::system::error_code const & ec, size_t bytes_transferred) {}); + boost::beast::http::async_write (this_l->socket, *continue_response, boost::asio::bind_executor (this_l->strand, [this_l, continue_response](boost::system::error_code const & ec, size_t bytes_transferred) {})); } this_l->parse_request (header_parser); @@ -77,36 +78,39 @@ void nano::rpc_connection::read () // Respond with the reason for the invalid header auto response_handler ([this_l](std::string const & tree_a) { this_l->write_result (tree_a, 11); - boost::beast::http::async_write (this_l->socket, this_l->res, [this_l](boost::system::error_code const & ec, size_t bytes_transferred) { + boost::beast::http::async_write (this_l->socket, this_l->res, boost::asio::bind_executor (this_l->strand, [this_l](boost::system::error_code const & ec, size_t bytes_transferred) { this_l->write_completion_handler (this_l); - }); + })); }); json_error_response (response_handler, std::string ("Invalid header: ") + ec.message ()); } - }); + })); } void nano::rpc_connection::parse_request (std::shared_ptr> header_parser) { auto this_l (shared_from_this ()); auto body_parser (std::make_shared> (std::move (*header_parser))); - boost::beast::http::async_read (socket, buffer, *body_parser, [this_l, body_parser](boost::system::error_code const & ec, size_t bytes_transferred) { + boost::beast::http::async_read (socket, buffer, *body_parser, boost::asio::bind_executor (strand, [this_l, body_parser](boost::system::error_code const & ec, size_t bytes_transferred) { if (!ec) { - // equivalent to background this_l->io_ctx.post ([this_l, body_parser]() { auto & req (body_parser->get ()); auto start (std::chrono::steady_clock::now ()); auto version (req.version ()); - std::string request_id (boost::str (boost::format ("%1%") % boost::io::group (std::hex, std::showbase, reinterpret_cast (this_l.get ())))); + std::stringstream ss; + ss << std::hex << std::showbase << reinterpret_cast (this_l.get ()); + auto request_id = ss.str (); auto response_handler ([this_l, version, start, request_id](std::string const & tree_a) { auto body = tree_a; this_l->write_result (body, version); - boost::beast::http::async_write (this_l->socket, this_l->res, [this_l](boost::system::error_code const & ec, size_t bytes_transferred) { + boost::beast::http::async_write (this_l->socket, this_l->res, boost::asio::bind_executor (this_l->strand, [this_l](boost::system::error_code const & ec, size_t bytes_transferred) { this_l->write_completion_handler (this_l); - }); + })); - this_l->logger.always_log (boost::str (boost::format ("RPC request %2% completed in: %1% microseconds") % std::chrono::duration_cast (std::chrono::steady_clock::now () - start).count () % request_id)); + std::stringstream ss; + ss << "RPC request " << request_id << " completed in: " << std::chrono::duration_cast (std::chrono::steady_clock::now () - start).count () << " microseconds"; + this_l->logger.always_log (ss.str ().c_str ()); }); auto method = req.method (); switch (method) @@ -121,9 +125,9 @@ void nano::rpc_connection::parse_request (std::shared_ptrprepare_head (version); this_l->res.prepare_payload (); - boost::beast::http::async_write (this_l->socket, this_l->res, [this_l](boost::system::error_code const & ec, size_t bytes_transferred) { + boost::beast::http::async_write (this_l->socket, this_l->res, boost::asio::bind_executor (this_l->strand, [this_l](boost::system::error_code const & ec, size_t bytes_transferred) { this_l->write_completion_handler (this_l); - }); + })); break; } default: @@ -138,7 +142,7 @@ void nano::rpc_connection::parse_request (std::shared_ptrlogger.always_log ("RPC read error: ", ec.message ()); } - }); + })); } void nano::rpc_connection::write_completion_handler (std::shared_ptr rpc_connection) diff --git a/nano/rpc/rpc_connection.hpp b/nano/rpc/rpc_connection.hpp index 610c18e42..ee0287fb9 100644 --- a/nano/rpc/rpc_connection.hpp +++ b/nano/rpc/rpc_connection.hpp @@ -2,10 +2,18 @@ #include +#include #include #include +/* Boost v1.70 introduced breaking changes; the conditional compilation allows 1.6x to be supported as well. */ +#if BOOST_VERSION < 107000 +using socket_type = boost::asio::ip::tcp::socket; +#else +using socket_type = boost::asio::basic_stream_socket; +#endif + namespace nano { class logger_mt; @@ -25,9 +33,10 @@ public: void read (); - boost::asio::ip::tcp::socket socket; + socket_type socket; boost::beast::flat_buffer buffer; boost::beast::http::response res; + boost::asio::strand strand; std::atomic_flag responded; boost::asio::io_context & io_ctx; nano::logger_mt & logger; diff --git a/nano/rpc/rpc_connection_secure.cpp b/nano/rpc/rpc_connection_secure.cpp index fa9e738bf..cb1998f88 100644 --- a/nano/rpc/rpc_connection_secure.cpp +++ b/nano/rpc/rpc_connection_secure.cpp @@ -40,7 +40,7 @@ void nano::rpc_connection_secure::handle_handshake (const boost::system::error_c void nano::rpc_connection_secure::write_completion_handler (std::shared_ptr rpc) { auto rpc_connection_secure = boost::polymorphic_pointer_downcast (rpc); - rpc_connection_secure->stream.async_shutdown ([rpc_connection_secure](auto const & ec_shutdown) { + rpc_connection_secure->stream.async_shutdown (boost::asio::bind_executor (rpc->strand, [rpc_connection_secure](auto const & ec_shutdown) { rpc_connection_secure->on_shutdown (ec_shutdown); - }); + })); } diff --git a/nano/rpc/rpc_connection_secure.hpp b/nano/rpc/rpc_connection_secure.hpp index 16f8fa822..d934f04d4 100644 --- a/nano/rpc/rpc_connection_secure.hpp +++ b/nano/rpc/rpc_connection_secure.hpp @@ -22,6 +22,6 @@ public: void on_shutdown (const boost::system::error_code & error); private: - boost::asio::ssl::stream stream; + boost::asio::ssl::stream stream; }; } diff --git a/nano/rpc/rpc_handler.cpp b/nano/rpc/rpc_handler.cpp index 75fc44fd9..298ba1b21 100644 --- a/nano/rpc/rpc_handler.cpp +++ b/nano/rpc/rpc_handler.cpp @@ -51,13 +51,18 @@ void nano::rpc_handler::process_request () } else { - std::stringstream ss; - ss << body; boost::property_tree::ptree request; - boost::property_tree::read_json (ss, request); + { + std::stringstream ss; + ss << body; + boost::property_tree::read_json (ss, request); + } auto action = request.get ("action"); - logger.always_log (boost::str (boost::format ("%1% ") % request_id), filter_request (request)); + // Creating same string via stringstream as using it directly is generating a TSAN warning + std::stringstream ss; + ss << request_id; + logger.always_log (ss.str (), " ", filter_request (request)); // Check if this is a RPC command which requires RPC enabled control std::error_code rpc_control_disabled_ec = nano::error_rpc::rpc_control_disabled; diff --git a/nano/rpc/rpc_handler.hpp b/nano/rpc/rpc_handler.hpp index 226f4fce5..3ef400b31 100644 --- a/nano/rpc/rpc_handler.hpp +++ b/nano/rpc/rpc_handler.hpp @@ -17,7 +17,6 @@ class rpc_handler : public std::enable_shared_from_this public: rpc_handler (nano::rpc_config const & rpc_config, std::string const & body_a, std::string const & request_id_a, std::function const & response_a, nano::rpc_handler_interface & rpc_handler_interface_a, nano::logger_mt & logger); void process_request (); - void read (std::shared_ptr> req, std::shared_ptr> res, const std::string & action); private: std::string body; diff --git a/nano/rpc/rpc_secure.cpp b/nano/rpc/rpc_secure.cpp index 6ac476c12..df13ea450 100644 --- a/nano/rpc/rpc_secure.cpp +++ b/nano/rpc/rpc_secure.cpp @@ -101,7 +101,7 @@ ssl_context (boost::asio::ssl::context::tlsv12_server) void nano::rpc_secure::accept () { auto connection (std::make_shared (config, io_ctx, logger, rpc_handler_interface, this->ssl_context)); - acceptor.async_accept (connection->socket, [this, connection](boost::system::error_code const & ec) { + acceptor.async_accept (connection->socket, boost::asio::bind_executor (connection->strand, [this, connection](boost::system::error_code const & ec) { if (acceptor.is_open ()) { accept (); @@ -114,5 +114,5 @@ void nano::rpc_secure::accept () { logger.always_log (boost::str (boost::format ("Error accepting RPC connections: %1%") % ec)); } - }); + })); }