Support external RPC servers via IPC (#1434)

* Support external RPC servers via IPC

* Warning fix and rebase of moved config option

* Fixed merge error

* Lifetime fix for action handlers with async response

* Chrono for timeouts, more consistent arg names, docs

* Convert bind to lambda, use nano::timer, id dispenser and misc improvements

* Configuration upgrade

* Config upgrade and ipc logging flag

* log_ipc flag, reserved bytes in preamble, misc cleanup

* Add ipc client and tests

* Compile-time OS  guard for domain sockets

* Remove debug catch, const string refs where applicable

* Destructor and cast fixes, use version 16 in nodeconfig

* Virtual dtor in ipc_config_transport

* Typo fix and final specifier
This commit is contained in:
cryptocode 2019-01-28 21:22:07 +01:00 committed by GitHub
commit f492cfc250
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 1001 additions and 24 deletions

View file

@ -3,6 +3,7 @@ add_executable (core_test
block.cpp
block_store.cpp
interface.cpp
ipc.cpp
conflicts.cpp
daemon.cpp
entry.cpp

87
nano/core_test/ipc.cpp Normal file
View file

@ -0,0 +1,87 @@
#include <boost/property_tree/json_parser.hpp>
#include <chrono>
#include <gtest/gtest.h>
#include <memory>
#include <nano/core_test/testutil.hpp>
#include <nano/node/ipc.hpp>
#include <nano/node/rpc.hpp>
#include <nano/node/testing.hpp>
#include <sstream>
#include <vector>
using namespace std::chrono_literals;
TEST (ipc, asynchronous)
{
nano::system system (24000, 1);
nano::rpc rpc (system.io_ctx, *system.nodes[0], nano::rpc_config (true));
system.nodes[0]->config.ipc_config.transport_tcp.enabled = true;
system.nodes[0]->config.ipc_config.transport_tcp.port = 24077;
nano::ipc::ipc_server ipc (*system.nodes[0], rpc);
nano::ipc::ipc_client client (system.nodes[0]->io_ctx);
auto req (client.prepare_request (nano::ipc::payload_encoding::json_legacy, std::string (R"({"action": "block_count"})")));
auto res (std::make_shared<std::vector<uint8_t>> ());
std::atomic<bool> call_completed{ false };
client.async_connect ("::1", 24077, [&client, &req, &res, &call_completed](nano::error err) {
client.async_write (req, [&client, &req, &res, &call_completed](nano::error err_a, size_t size_a) {
ASSERT_NO_ERROR (static_cast<std::error_code> (err_a));
ASSERT_EQ (size_a, req->size ());
// Read length
client.async_read (res, sizeof (uint32_t), [&client, &res, &call_completed](nano::error err_read_a, size_t size_read_a) {
ASSERT_NO_ERROR (static_cast<std::error_code> (err_read_a));
ASSERT_EQ (size_read_a, sizeof (uint32_t));
uint32_t payload_size_l = boost::endian::big_to_native (*reinterpret_cast<uint32_t *> (res->data ()));
// Read json payload
client.async_read (res, payload_size_l, [&res, &call_completed](nano::error err_read_a, size_t size_read_a) {
std::string payload (res->begin (), res->end ());
std::stringstream ss;
ss << payload;
// Make sure the response is valid json
boost::property_tree::ptree blocks;
boost::property_tree::read_json (ss, blocks);
ASSERT_EQ (blocks.get<int> ("count"), 1);
call_completed = true;
});
});
});
});
system.deadline_set (5s);
while (!call_completed)
{
ASSERT_NO_ERROR (system.poll ());
}
}
TEST (ipc, synchronous)
{
nano::system system (24000, 1);
nano::rpc rpc (system.io_ctx, *system.nodes[0], nano::rpc_config (true));
system.nodes[0]->config.ipc_config.transport_tcp.enabled = true;
system.nodes[0]->config.ipc_config.transport_tcp.port = 24077;
nano::ipc::ipc_server ipc (*system.nodes[0], rpc);
nano::ipc::rpc_ipc_client client (system.nodes[0]->io_ctx);
// Start blocking IPC client in a separate thread
std::atomic<bool> call_completed{ false };
std::thread client_thread ([&client, &call_completed]() {
client.connect ("::1", 24077);
std::string response (client.request (std::string (R"({"action": "block_count"})")));
std::stringstream ss;
ss << response;
// Make sure the response is valid json
boost::property_tree::ptree blocks;
boost::property_tree::read_json (ss, blocks);
ASSERT_EQ (blocks.get<int> ("count"), 1);
call_completed = true;
});
client_thread.detach ();
system.deadline_set (5s);
while (!call_completed)
{
ASSERT_NO_ERROR (system.poll ());
}
}

View file

@ -1,10 +1,10 @@
#include <nano/lib/jsonconfig.hpp>
#include <nano/lib/utility.hpp>
#include <nano/nano_node/daemon.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <fstream>
#include <iostream>
#include <nano/lib/jsonconfig.hpp>
#include <nano/lib/utility.hpp>
#include <nano/nano_node/daemon.hpp>
#include <nano/node/ipc.hpp>
#include <nano/node/working.hpp>
nano_daemon::daemon_config::daemon_config () :
@ -139,10 +139,11 @@ void nano_daemon::daemon::run (boost::filesystem::path const & data_path, nano::
node->flags = flags;
node->start ();
std::unique_ptr<nano::rpc> rpc = get_rpc (io_ctx, *node, config.rpc);
if (rpc && config.rpc_enable)
if (rpc)
{
rpc->start ();
rpc->start (config.rpc_enable);
}
nano::ipc::ipc_server ipc (*node, *rpc);
runner = std::make_unique<nano::thread_runner> (io_ctx, node->config.io_threads);
runner->join ();
}

View file

@ -3,6 +3,7 @@
#include <nano/lib/utility.hpp>
#include <nano/nano_wallet/icon.hpp>
#include <nano/node/cli.hpp>
#include <nano/node/ipc.hpp>
#include <nano/node/rpc.hpp>
#include <nano/node/working.hpp>
#include <nano/qt/qt.hpp>
@ -277,12 +278,14 @@ int run_wallet (QApplication & application, int argc, char * const * argv, boost
update_config (config, config_path);
node->start ();
std::unique_ptr<nano::rpc> rpc = get_rpc (io_ctx, *node, config.rpc);
if (rpc && config.rpc_enable)
if (rpc)
{
rpc->start ();
rpc->start (config.rpc_enable);
}
nano::ipc::ipc_server ipc (*node, *rpc);
nano::thread_runner runner (io_ctx, node->config.io_threads);
QObject::connect (&application, &QApplication::aboutToQuit, [&]() {
ipc.stop ();
rpc->stop ();
node->stop ();
});

View file

@ -23,6 +23,8 @@ add_library (node
cli.cpp
common.cpp
common.hpp
ipc.hpp
ipc.cpp
lmdb.cpp
lmdb.hpp
logging.cpp

672
nano/node/ipc.cpp Normal file
View file

@ -0,0 +1,672 @@
#include <algorithm>
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/endian/conversion.hpp>
#include <boost/polymorphic_cast.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/thread/thread_time.hpp>
#include <chrono>
#include <cstdio>
#include <fstream>
#include <future>
#include <iostream>
#include <nano/lib/timer.hpp>
#include <nano/node/common.hpp>
#include <nano/node/ipc.hpp>
#include <nano/node/node.hpp>
#include <nano/node/rpc.hpp>
#include <thread>
using namespace boost::log;
namespace
{
/**
* The IPC framing format is simple: preamble followed by an encoding specific payload.
* Preamble is uint8_t {'N', encoding_type, reserved, reserved}. Reserved bytes MUST be zero.
* @note This is intentionally not an enum class as the values are only used as vector indices.
*/
enum preamble_offset
{
/** Always 'N' */
lead = 0,
/** One of the payload_encoding values */
encoding = 1,
/** Always zero */
reserved_1 = 2,
/** Always zero */
reserved_2 = 3,
};
}
nano::error nano::ipc::ipc_config::serialize_json (nano::jsonconfig & json) const
{
nano::jsonconfig tcp_l;
// Only write out experimental config values if they're previously set explicitly in the config file
if (transport_tcp.io_threads >= 0)
{
tcp_l.put ("io_threads", transport_tcp.io_threads);
}
tcp_l.put ("enable", transport_tcp.enabled);
tcp_l.put ("port", transport_tcp.port);
tcp_l.put ("io_timeout", transport_tcp.io_timeout);
json.put_child ("tcp", tcp_l);
nano::jsonconfig domain_l;
if (transport_domain.io_threads >= 0)
{
domain_l.put ("io_threads", transport_domain.io_threads);
}
domain_l.put ("enable", transport_domain.enabled);
domain_l.put ("path", transport_domain.path);
domain_l.put ("io_timeout", transport_domain.io_timeout);
json.put_child ("local", domain_l);
return json.get_error ();
}
nano::error nano::ipc::ipc_config::deserialize_json (nano::jsonconfig & json)
{
auto tcp_l (json.get_optional_child ("tcp"));
if (tcp_l)
{
tcp_l->get_optional<long> ("io_threads", transport_tcp.io_threads, -1);
tcp_l->get<bool> ("enable", transport_tcp.enabled);
tcp_l->get<uint16_t> ("port", transport_tcp.port);
tcp_l->get<size_t> ("io_timeout", transport_tcp.io_timeout);
}
auto domain_l (json.get_optional_child ("local"));
if (domain_l)
{
domain_l->get_optional<long> ("io_threads", transport_domain.io_threads, -1);
domain_l->get<bool> ("enable", transport_domain.enabled);
domain_l->get<std::string> ("path", transport_domain.path);
domain_l->get<size_t> ("io_timeout", transport_domain.io_timeout);
}
return json.get_error ();
}
/** Abstract base type for sockets, implementing timer logic and a close operation */
class socket_base
{
public:
socket_base (boost::asio::io_context & io_ctx_a) :
io_timer (io_ctx_a)
{
}
virtual ~socket_base () = default;
/** Close socket */
virtual void close () = 0;
/**
* Start IO timer.
* @param timeout_a Seconds to wait. To wait indefinitely, use std::chrono::seconds::max ()
*/
void timer_start (std::chrono::seconds timeout_a)
{
if (timeout_a < std::chrono::seconds::max ())
{
io_timer.expires_from_now (boost::posix_time::seconds (timeout_a.count ()));
io_timer.async_wait ([this](const boost::system::error_code & ec) {
if (!ec)
{
this->timer_expired ();
}
});
}
}
void timer_expired ()
{
close ();
}
void timer_cancel ()
{
boost::system::error_code ec;
io_timer.cancel (ec);
assert (!ec);
}
private:
/** IO operation timer */
boost::asio::deadline_timer io_timer;
};
/**
* A session represents an inbound connection over which multiple requests/reponses are transmitted.
*/
template <typename SOCKET_TYPE>
class session : public socket_base, public std::enable_shared_from_this<session<SOCKET_TYPE>>
{
public:
session (nano::ipc::ipc_server & server_a, boost::asio::io_context & io_ctx_a, nano::ipc::ipc_config_transport & config_transport_a) :
socket_base (io_ctx_a),
server (server_a), node (server_a.node), session_id (server_a.id_dispenser.fetch_add (1)), io_ctx (io_ctx_a), socket (io_ctx_a), config_transport (config_transport_a)
{
if (node.config.logging.log_ipc ())
{
BOOST_LOG (node.log) << "IPC: created session with id: " << session_id;
}
}
SOCKET_TYPE & get_socket ()
{
return socket;
}
/**
* Async read of exactly \p size_a bytes. The callback is invoked only when all the data is available and
* no error has occurred. On error, the error is logged, the read cycle stops and the session ends. Clients
* are expected to implement reconnect logic.
*/
void async_read_exactly (void * buff_a, size_t size_a, std::function<void()> callback_a)
{
async_read_exactly (buff_a, size_a, std::chrono::seconds (config_transport.io_timeout), callback_a);
}
/**
* Async read of exactly \p size_a bytes and a specific \p timeout_a.
* @see async_read_exactly (void *, size_t, std::function<void()>)
*/
void async_read_exactly (void * buff_a, size_t size_a, std::chrono::seconds timeout_a, std::function<void()> callback_a)
{
timer_start (timeout_a);
auto this_l (this->shared_from_this ());
boost::asio::async_read (socket,
boost::asio::buffer (buff_a, size_a),
boost::asio::transfer_exactly (size_a),
[this_l, callback_a](boost::system::error_code const & ec, size_t bytes_transferred_a) {
this_l->timer_cancel ();
if (ec == boost::asio::error::connection_aborted || ec == boost::asio::error::connection_reset)
{
if (this_l->node.config.logging.log_ipc ())
{
BOOST_LOG (this_l->node.log) << boost::str (boost::format ("IPC: error reading %1% ") % ec.message ());
}
}
else if (bytes_transferred_a > 0)
{
callback_a ();
}
});
}
/** Handler for payload_encoding::json_legacy */
void rpc_handle_query ()
{
session_timer.restart ();
auto request_id_l (std::to_string (server.id_dispenser.fetch_add (1)));
// This is called when nano::rpc_handler#process_request is done. We convert to
// json and write the response to the ipc socket with a length prefix.
auto this_l (this->shared_from_this ());
auto response_handler_l ([this_l, request_id_l](boost::property_tree::ptree const & tree_a) {
std::stringstream ostream;
boost::property_tree::write_json (ostream, tree_a);
ostream.flush ();
std::string response_body = ostream.str ();
uint32_t size_response = boost::endian::native_to_big (static_cast<uint32_t> (response_body.size ()));
std::vector<boost::asio::mutable_buffer> bufs = {
boost::asio::buffer (&size_response, sizeof (size_response)),
boost::asio::buffer (response_body)
};
this_l->timer_start (std::chrono::seconds (this_l->config_transport.io_timeout));
boost::asio::async_write (this_l->socket, bufs, [this_l](boost::system::error_code const & error_a, size_t size_a) {
this_l->timer_cancel ();
if (!error_a)
{
this_l->read_next_request ();
}
else if (this_l->node.config.logging.log_ipc ())
{
BOOST_LOG (this_l->node.log) << "IPC: Write failed: " << error_a.message ();
}
});
if (this_l->node.config.logging.log_ipc ())
{
BOOST_LOG (this_l->node.log) << boost::str (boost::format ("IPC/RPC request %1% completed in: %2% %3%") % request_id_l % this_l->session_timer.stop ().count () % this_l->session_timer.unit ());
}
});
node.stats.inc (nano::stat::type::ipc, nano::stat::detail::invocations);
auto body (std::string (reinterpret_cast<char *> (buffer.data ()), buffer.size ()));
// Note that if the rpc action is async, the shared_ptr<rpc_handler> lifetime will be extended by the action handler
nano::rpc_handler handler (node, server.rpc, body, request_id_l, response_handler_l);
handler.process_request ();
}
/** Async request reader */
void read_next_request ()
{
auto this_l = this->shared_from_this ();
// Await next request indefinitely
buffer.resize (sizeof (buffer_size));
async_read_exactly (buffer.data (), buffer.size (), std::chrono::seconds::max (), [this_l]() {
if (this_l->buffer[preamble_offset::lead] != 'N' || this_l->buffer[preamble_offset::reserved_1] != 0 || this_l->buffer[preamble_offset::reserved_2] != 0)
{
if (this_l->node.config.logging.log_ipc ())
{
BOOST_LOG (this_l->node.log) << "IPC: Invalid preamble";
}
}
else if (this_l->buffer[preamble_offset::encoding] == static_cast<uint8_t> (nano::ipc::payload_encoding::json_legacy))
{
// Length of payload
this_l->async_read_exactly (&this_l->buffer_size, sizeof (this_l->buffer_size), [this_l]() {
boost::endian::big_to_native_inplace (this_l->buffer_size);
this_l->buffer.resize (this_l->buffer_size);
// Payload (ptree compliant JSON string)
this_l->async_read_exactly (this_l->buffer.data (), this_l->buffer_size, [this_l]() {
this_l->rpc_handle_query ();
});
});
}
else if (this_l->node.config.logging.log_ipc ())
{
BOOST_LOG (this_l->node.log) << "IPC: Unsupported payload encoding";
}
});
}
/** Shut down and close socket */
void close ()
{
socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both);
socket.close ();
}
private:
nano::ipc::ipc_server & server;
nano::node & node;
/** Unique session id used for logging */
uint64_t session_id;
/** Timer for measuring the duration of ipc calls */
nano::timer<std::chrono::microseconds> session_timer;
/**
* IO context from node, or per-transport, depending on configuration.
* Certain transports may scale better if they use a separate context.
*/
boost::asio::io_context & io_ctx;
/** A socket of the given asio type */
SOCKET_TYPE socket;
/** Buffer sizes are read into this */
uint32_t buffer_size{ 0 };
/** Buffer used to store data received from the client */
std::vector<uint8_t> buffer;
/** Transport configuration */
nano::ipc::ipc_config_transport & config_transport;
};
/** Domain and TCP socket transport */
template <typename ACCEPTOR_TYPE, typename SOCKET_TYPE, typename ENDPOINT_TYPE>
class socket_transport : public nano::ipc::transport
{
public:
socket_transport (nano::ipc::ipc_server & server_a, ENDPOINT_TYPE endpoint_a, nano::ipc::ipc_config_transport & config_transport_a, int concurrency_a) :
server (server_a), config_transport (config_transport_a)
{
// Using a per-transport event dispatcher?
if (concurrency_a > 0)
{
io_ctx = std::make_unique<boost::asio::io_context> ();
}
boost::asio::socket_base::reuse_address option (true);
boost::asio::socket_base::keep_alive option_keepalive (true);
acceptor = std::make_unique<ACCEPTOR_TYPE> (context (), endpoint_a);
acceptor->set_option (option);
acceptor->set_option (option_keepalive);
accept ();
// Start serving IO requests. If concurrency_a is < 1, the node's thread pool/io_context is used instead.
// A separate io_context for domain sockets may facilitate better performance on some systems.
if (concurrency_a > 0)
{
runner = std::make_unique<nano::thread_runner> (*io_ctx, concurrency_a);
}
}
boost::asio::io_context & context () const
{
return io_ctx ? *io_ctx : server.node.io_ctx;
}
void accept ()
{
// Prepare the next session
auto new_session (std::make_shared<session<SOCKET_TYPE>> (server, context (), config_transport));
acceptor->async_accept (new_session->get_socket (), [this, new_session](boost::system::error_code const & ec) {
if (!ec)
{
new_session->read_next_request ();
}
else
{
BOOST_LOG (server.node.log) << "IPC: acceptor error: " << ec.message ();
}
if (acceptor->is_open () && ec != boost::asio::error::operation_aborted)
{
this->accept ();
}
else
{
BOOST_LOG (server.node.log) << "IPC: shutting down";
}
});
}
void stop ()
{
acceptor->close ();
if (io_ctx)
{
io_ctx->stop ();
}
if (runner)
{
runner->join ();
}
}
private:
nano::ipc::ipc_server & server;
nano::ipc::ipc_config_transport & config_transport;
std::unique_ptr<nano::thread_runner> runner;
std::unique_ptr<boost::asio::io_context> io_ctx;
std::unique_ptr<ACCEPTOR_TYPE> acceptor;
};
/** The domain socket file is attempted removed at both startup and shutdown. */
class nano::ipc::dsock_file_remover final
{
public:
dsock_file_remover (std::string const & file_a) :
filename (file_a)
{
std::remove (filename.c_str ());
}
~dsock_file_remover ()
{
std::remove (filename.c_str ());
}
std::string filename;
};
nano::ipc::ipc_server::ipc_server (nano::node & node_a, nano::rpc & rpc_a) :
node (node_a), rpc (rpc_a)
{
try
{
if (node_a.config.ipc_config.transport_domain.enabled)
{
#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
size_t threads = node_a.config.ipc_config.transport_domain.io_threads;
file_remover = std::make_unique<dsock_file_remover> (node_a.config.ipc_config.transport_domain.path);
boost::asio::local::stream_protocol::endpoint ep{ node_a.config.ipc_config.transport_domain.path };
transports.push_back (std::make_shared<socket_transport<boost::asio::local::stream_protocol::acceptor, boost::asio::local::stream_protocol::socket, boost::asio::local::stream_protocol::endpoint>> (*this, ep, node_a.config.ipc_config.transport_domain, threads));
#else
BOOST_LOG (node.log) << "IPC: Domain sockets are not supported on this platform";
#endif
}
if (node_a.config.ipc_config.transport_tcp.enabled)
{
size_t threads = node_a.config.ipc_config.transport_tcp.io_threads;
transports.push_back (std::make_shared<socket_transport<boost::asio::ip::tcp::acceptor, boost::asio::ip::tcp::socket, boost::asio::ip::tcp::endpoint>> (*this, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v6 (), node_a.config.ipc_config.transport_tcp.port), node_a.config.ipc_config.transport_tcp, threads));
}
BOOST_LOG (node.log) << "IPC: server started";
}
catch (std::runtime_error const & ex)
{
BOOST_LOG (node.log) << "IPC: " << ex.what ();
}
}
nano::ipc::ipc_server::~ipc_server ()
{
BOOST_LOG (node.log) << "IPC: server stopped";
}
void nano::ipc::ipc_server::stop ()
{
for (auto & transport : transports)
{
transport->stop ();
}
}
/** Socket agnostic IO interface */
class channel
{
public:
virtual void async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, size_t size_a, std::function<void(boost::system::error_code const &, size_t)> callback_a) = 0;
virtual void async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(boost::system::error_code const &, size_t)> callback_a) = 0;
};
/** Domain and TCP client socket */
template <typename SOCKET_TYPE, typename ENDPOINT_TYPE>
class socket_client : public socket_base, public channel
{
public:
socket_client (boost::asio::io_context & io_ctx_a, ENDPOINT_TYPE endpoint_a) :
socket_base (io_ctx_a), endpoint (endpoint_a), socket (io_ctx_a), resolver (io_ctx_a)
{
}
void async_resolve (std::string const & host_a, uint16_t port_a, std::function<void(boost::system::error_code const &, boost::asio::ip::tcp::endpoint)> callback_a)
{
this->timer_start (io_timeout);
resolver.async_resolve (boost::asio::ip::tcp::resolver::query (host_a, std::to_string (port_a)), [this, callback_a](boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator endpoint_iterator_a) {
this->timer_cancel ();
boost::asio::ip::tcp::resolver::iterator end;
if (!ec && endpoint_iterator_a != end)
{
endpoint = *endpoint_iterator_a;
callback_a (ec, *endpoint_iterator_a);
}
else
{
callback_a (ec, *end);
}
});
}
void async_connect (std::function<void(boost::system::error_code const &)> callback_a)
{
this->timer_start (io_timeout);
socket.async_connect (endpoint, [this, callback_a](boost::system::error_code const & ec) {
this->timer_cancel ();
callback_a (ec);
});
}
void async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, size_t size_a, std::function<void(boost::system::error_code const &, size_t)> callback_a) override
{
this->timer_start (io_timeout);
buffer_a->resize (size_a);
boost::asio::async_read (socket, boost::asio::buffer (buffer_a->data (), size_a), [this, callback_a](boost::system::error_code const & ec, size_t size_a) {
this->timer_cancel ();
callback_a (ec, size_a);
});
}
void async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(boost::system::error_code const &, size_t)> callback_a) override
{
this->timer_start (io_timeout);
boost::asio::async_write (socket, boost::asio::buffer (buffer_a->data (), buffer_a->size ()), [this, callback_a, buffer_a](boost::system::error_code const & ec, size_t size_a) {
this->timer_cancel ();
callback_a (ec, size_a);
});
}
/** Shut down and close socket */
void close () override
{
socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both);
socket.close ();
}
private:
ENDPOINT_TYPE endpoint;
SOCKET_TYPE socket;
boost::asio::ip::tcp::resolver resolver;
std::chrono::seconds io_timeout{ 60 };
};
/**
* PIMPL class for ipc_client. This ensures that socket_client and boost details can
* stay out of the header file.
*/
class client_impl : public nano::ipc::ipc_client_impl
{
public:
client_impl (boost::asio::io_context & io_ctx_a) :
io_ctx (io_ctx_a)
{
}
void connect (std::string const & host_a, uint16_t port_a, std::function<void(nano::error)> callback_a)
{
tcp_client = std::make_shared<socket_client<boost::asio::ip::tcp::socket, boost::asio::ip::tcp::endpoint>> (io_ctx, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v6 (), port_a));
tcp_client->async_resolve (host_a, port_a, [this, callback_a](boost::system::error_code const & ec_resolve_a, boost::asio::ip::tcp::endpoint endpoint_a) {
if (!ec_resolve_a)
{
this->tcp_client->async_connect ([callback_a](const boost::system::error_code & ec_connect_a) {
callback_a (nano::error (ec_connect_a));
});
}
else
{
callback_a (nano::error (ec_resolve_a));
}
});
}
nano::error connect (std::string const & path_a)
{
nano::error err;
#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
domain_client = std::make_shared<socket_client<boost::asio::local::stream_protocol::socket, boost::asio::local::stream_protocol::endpoint>> (io_ctx, boost::asio::local::stream_protocol::endpoint (path_a));
#else
err = nano::error ("Domain sockets are not supported by this platform");
#endif
return err;
}
channel & get_channel ()
{
#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
return tcp_client ? static_cast<channel &> (*tcp_client) : static_cast<channel &> (*domain_client);
#else
return static_cast<channel &> (*tcp_client);
#endif
}
private:
boost::asio::io_context & io_ctx;
std::shared_ptr<socket_client<boost::asio::ip::tcp::socket, boost::asio::ip::tcp::endpoint>> tcp_client;
#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
std::shared_ptr<socket_client<boost::asio::local::stream_protocol::socket, boost::asio::local::stream_protocol::endpoint>> domain_client;
#endif
};
nano::ipc::ipc_client::ipc_client (boost::asio::io_context & io_ctx_a) :
io_ctx (io_ctx_a)
{
}
nano::error nano::ipc::ipc_client::connect (std::string const & path_a)
{
impl = std::make_unique<client_impl> (io_ctx);
return boost::polymorphic_downcast<client_impl *> (impl.get ())->connect (path_a);
}
void nano::ipc::ipc_client::async_connect (std::string const & host_a, uint16_t port_a, std::function<void(nano::error)> callback_a)
{
impl = std::make_unique<client_impl> (io_ctx);
auto client (boost::polymorphic_downcast<client_impl *> (impl.get ()));
client->connect (host_a, port_a, callback_a);
}
nano::error nano::ipc::ipc_client::connect (std::string const & host, uint16_t port)
{
std::promise<nano::error> result_l;
async_connect (host, port, [&result_l](nano::error err_a) {
result_l.set_value (err_a);
});
return result_l.get_future ().get ();
}
void nano::ipc::ipc_client::async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(nano::error, size_t)> callback_a)
{
auto client (boost::polymorphic_downcast<client_impl *> (impl.get ()));
client->get_channel ().async_write (buffer_a, [callback_a](const boost::system::error_code & ec_a, size_t bytes_written_a) {
callback_a (nano::error (ec_a), bytes_written_a);
});
}
void nano::ipc::ipc_client::async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, size_t size_a, std::function<void(nano::error, size_t)> callback_a)
{
auto client (boost::polymorphic_downcast<client_impl *> (impl.get ()));
client->get_channel ().async_read (buffer_a, size_a, [callback_a](const boost::system::error_code & ec_a, size_t bytes_read_a) {
callback_a (nano::error (ec_a), bytes_read_a);
});
}
std::shared_ptr<std::vector<uint8_t>> nano::ipc::ipc_client::prepare_request (nano::ipc::payload_encoding encoding_a, std::string const & payload_a)
{
auto buffer_l (std::make_shared<std::vector<uint8_t>> ());
if (encoding_a == nano::ipc::payload_encoding::json_legacy)
{
buffer_l->push_back ('N');
buffer_l->push_back (static_cast<uint8_t> (encoding_a));
buffer_l->push_back (0);
buffer_l->push_back (0);
uint32_t payload_length = payload_a.size ();
uint32_t be = boost::endian::native_to_big (payload_length);
char * chars = reinterpret_cast<char *> (&be);
buffer_l->insert (buffer_l->end (), chars, chars + sizeof (uint32_t));
buffer_l->insert (buffer_l->end (), payload_a.begin (), payload_a.end ());
}
return buffer_l;
}
std::string nano::ipc::rpc_ipc_client::request (std::string const & rpc_action_a)
{
auto req (prepare_request (nano::ipc::payload_encoding::json_legacy, rpc_action_a));
auto res (std::make_shared<std::vector<uint8_t>> ());
std::promise<std::string> result_l;
async_write (req, [this, &res, &result_l](nano::error err_a, size_t size_a) {
// Read length
this->async_read (res, sizeof (uint32_t), [this, &res, &result_l](nano::error err_read_a, size_t size_read_a) {
uint32_t payload_size_l = boost::endian::big_to_native (*reinterpret_cast<uint32_t *> (res->data ()));
// Read json payload
this->async_read (res, payload_size_l, [&res, &result_l](nano::error err_read_a, size_t size_read_a) {
result_l.set_value (std::string (res->begin (), res->end ()));
});
});
});
return result_l.get_future ().get ();
}

155
nano/node/ipc.hpp Normal file
View file

@ -0,0 +1,155 @@
#pragma once
#include <atomic>
#include <boost/asio.hpp>
#include <boost/property_tree/ptree.hpp>
#include <nano/lib/errors.hpp>
#include <nano/lib/jsonconfig.hpp>
#include <string>
#include <vector>
namespace nano
{
class node;
class rpc;
}
namespace nano
{
namespace ipc
{
/**
* Payload encodings; add protobuf, flatbuffers and so on as needed.
*/
enum class payload_encoding : uint8_t
{
/**
* Request is preamble followed by 32-bit BE payload length and payload bytes.
* Response is 32-bit BE payload length followed by payload bytes.
*/
json_legacy = 1
};
/** Removes domain socket files on startup and shutdown */
class dsock_file_remover;
/** IPC transport interface */
class transport
{
public:
virtual void stop () = 0;
virtual ~transport () = default;
};
/** Base class for transport configurations */
class ipc_config_transport
{
public:
virtual ~ipc_config_transport () = default;
bool enabled{ false };
size_t io_timeout{ 15 };
long io_threads{ -1 };
};
/** Domain socket specific transport config */
class ipc_config_domain_socket : public ipc_config_transport
{
public:
/**
* Default domain socket path for Unix systems. Once Boost supports Windows 10 usocks,
* this value will be conditional on OS.
*/
std::string path{ "/tmp/nano" };
};
/** TCP specific transport config */
class ipc_config_tcp_socket : public ipc_config_transport
{
public:
/** Listening port */
uint16_t port{ 7077 };
};
/** IPC configuration */
class ipc_config
{
public:
nano::error deserialize_json (nano::jsonconfig & json_a);
nano::error serialize_json (nano::jsonconfig & json) const;
ipc_config_domain_socket transport_domain;
ipc_config_tcp_socket transport_tcp;
};
/** The IPC server accepts connections on one or more configured transports */
class ipc_server
{
public:
ipc_server (nano::node & node, nano::rpc & rpc);
virtual ~ipc_server ();
void stop ();
nano::node & node;
nano::rpc & rpc;
/** Unique counter/id shared across sessions */
std::atomic<uint64_t> id_dispenser{ 0 };
private:
std::unique_ptr<dsock_file_remover> file_remover;
std::vector<std::shared_ptr<nano::ipc::transport>> transports;
};
class ipc_client_impl
{
public:
virtual ~ipc_client_impl () = default;
};
/** IPC client */
class ipc_client
{
public:
ipc_client (boost::asio::io_context & io_ctx_a);
virtual ~ipc_client () = default;
/** Connect to a domain socket */
nano::error connect (std::string const & path);
/** Connect to a tcp socket synchronously */
nano::error connect (std::string const & host, uint16_t port);
/** Connect to a tcp socket asynchronously */
void async_connect (std::string const & host, uint16_t port, std::function<void(nano::error)> callback);
/** Write buffer asynchronously */
void async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(nano::error, size_t)> callback_a);
/** Read \p size_a bytes asynchronously */
void async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, size_t size_a, std::function<void(nano::error, size_t)> callback_a);
/**
* Returns a buffer with an IPC preamble for the given \p encoding_a followed by the payload. Depending on encoding,
* the buffer may contain a payload length or end sentinel.
*/
std::shared_ptr<std::vector<uint8_t>> prepare_request (nano::ipc::payload_encoding encoding_a, std::string const & payload_a);
private:
boost::asio::io_context & io_ctx;
// PIMPL pattern to hide implementation details
std::unique_ptr<ipc_client_impl> impl;
};
/** Convenience wrapper for making synchronous RPC calls via IPC */
class rpc_ipc_client : public ipc_client
{
public:
rpc_ipc_client (boost::asio::io_context & io_ctx_a) :
ipc_client (io_ctx_a)
{
}
/** Calls the RPC server via IPC and waits for the result. The client must be connected. */
std::string request (std::string const & rpc_action_a);
};
}
}

View file

@ -18,6 +18,7 @@ network_node_id_handshake_logging_value (false),
node_lifetime_tracing_value (false),
insufficient_work_logging_value (true),
log_rpc_value (true),
log_ipc_value (true),
bulk_pull_logging_value (false),
work_generation_time_value (true),
upnp_details_logging_value (false),
@ -58,6 +59,7 @@ nano::error nano::logging::serialize_json (nano::jsonconfig & json) const
json.put ("node_lifetime_tracing", node_lifetime_tracing_value);
json.put ("insufficient_work", insufficient_work_logging_value);
json.put ("log_rpc", log_rpc_value);
json.put ("log_ipc", log_ipc_value);
json.put ("bulk_pull", bulk_pull_logging_value);
json.put ("work_generation_time", work_generation_time_value);
json.put ("upnp_details", upnp_details_logging_value);
@ -94,6 +96,7 @@ bool nano::logging::upgrade_json (unsigned version_a, nano::jsonconfig & json)
json.get<uintmax_t> ("max_size", config_max_size);
max_size = std::max (max_size, config_max_size);
json.put ("max_size", max_size);
json.put ("log_ipc", true);
upgraded_l = true;
case 6:
break;
@ -138,6 +141,7 @@ nano::error nano::logging::deserialize_json (bool & upgraded_a, nano::jsonconfig
json.get<bool> ("node_lifetime_tracing", node_lifetime_tracing_value);
json.get<bool> ("insufficient_work", insufficient_work_logging_value);
json.get<bool> ("log_rpc", log_rpc_value);
json.get<bool> ("log_ipc", log_ipc_value);
json.get<bool> ("bulk_pull", bulk_pull_logging_value);
json.get<bool> ("work_generation_time", work_generation_time_value);
json.get<bool> ("upnp_details", upnp_details_logging_value);
@ -210,6 +214,11 @@ bool nano::logging::log_rpc () const
return network_logging () && log_rpc_value;
}
bool nano::logging::log_ipc () const
{
return network_logging () && log_ipc_value;
}
bool nano::logging::bulk_pull_logging () const
{
return network_logging () && bulk_pull_logging_value;

View file

@ -32,6 +32,7 @@ public:
bool upnp_details_logging () const;
bool timing_logging () const;
bool log_rpc () const;
bool log_ipc () const;
bool bulk_pull_logging () const;
bool callback_logging () const;
bool work_generation_time () const;
@ -50,6 +51,7 @@ public:
bool node_lifetime_tracing_value;
bool insufficient_work_logging_value;
bool log_rpc_value;
bool log_ipc_value;
bool bulk_pull_logging_value;
bool work_generation_time_value;
bool upnp_details_logging_value;

View file

@ -115,6 +115,11 @@ nano::error nano::node_config::serialize_json (nano::jsonconfig & json) const
json.put ("lmdb_max_dbs", lmdb_max_dbs);
json.put ("block_processor_batch_max_time", block_processor_batch_max_time.count ());
json.put ("allow_local_peers", allow_local_peers);
nano::jsonconfig ipc_l;
ipc_config.serialize_json (ipc_l);
json.put_child ("ipc", ipc_l);
return json.get_error ();
}
@ -222,6 +227,10 @@ bool nano::node_config::upgrade_json (unsigned version_a, nano::jsonconfig & jso
});
json.replace_child (preconfigured_peers_key, peers);
nano::jsonconfig ipc_l;
ipc_config.serialize_json (ipc_l);
json.put_child ("ipc", ipc_l);
upgraded = true;
}
case 16:
@ -315,6 +324,12 @@ nano::error nano::node_config::deserialize_json (bool & upgraded_a, nano::jsonco
auto block_processor_batch_max_time_l (json.get<unsigned long> ("block_processor_batch_max_time"));
block_processor_batch_max_time = std::chrono::milliseconds (block_processor_batch_max_time_l);
auto ipc_config_l (json.get_optional_child ("ipc"));
if (ipc_config_l)
{
ipc_config.deserialize_json (ipc_config_l.get ());
}
json.get<uint16_t> ("peering_port", peering_port);
json.get<unsigned> ("bootstrap_fraction_numerator", bootstrap_fraction_numerator);
json.get<unsigned> ("online_weight_quorum", online_weight_quorum);

View file

@ -4,6 +4,7 @@
#include <nano/lib/errors.hpp>
#include <nano/lib/jsonconfig.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/node/ipc.hpp>
#include <nano/node/logging.hpp>
#include <nano/node/stats.hpp>
#include <vector>
@ -44,6 +45,7 @@ public:
int lmdb_max_dbs;
bool allow_local_peers;
nano::stat_config stat_config;
nano::ipc::ipc_config ipc_config;
nano::uint256_union epoch_block_link;
nano::account epoch_block_signer;
std::chrono::milliseconds block_processor_batch_max_time;

View file

@ -87,26 +87,38 @@ node (node_a)
{
}
void nano::rpc::start ()
void nano::rpc::add_block_observer ()
{
auto endpoint (nano::tcp_endpoint (config.address, config.port));
acceptor.open (endpoint.protocol ());
acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true));
boost::system::error_code ec;
acceptor.bind (endpoint, ec);
if (ec)
{
BOOST_LOG (node.log) << boost::str (boost::format ("Error while binding for RPC on port %1%: %2%") % endpoint.port () % ec.message ());
throw std::runtime_error (ec.message ());
}
acceptor.listen ();
node.observers.blocks.add ([this](std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::uint128_t const &, bool) {
observer_action (account_a);
});
}
accept ();
void nano::rpc::start (bool rpc_enabled_a)
{
if (rpc_enabled_a)
{
auto endpoint (nano::tcp_endpoint (config.address, config.port));
acceptor.open (endpoint.protocol ());
acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true));
boost::system::error_code ec;
acceptor.bind (endpoint, ec);
if (ec)
{
BOOST_LOG (node.log) << boost::str (boost::format ("Error while binding for RPC on port %1%: %2%") % endpoint.port () % ec.message ());
throw std::runtime_error (ec.message ());
}
acceptor.listen ();
}
add_block_observer ();
if (rpc_enabled_a)
{
accept ();
}
}
void nano::rpc::accept ()

View file

@ -71,7 +71,13 @@ class rpc
public:
rpc (boost::asio::io_context &, nano::node &, nano::rpc_config const &);
virtual ~rpc () = default;
void start ();
/**
* Start serving RPC requests if \p rpc_enabled_a, otherwise this will only
* add a block observer since requests may still arrive via IPC.
*/
void start (bool rpc_enabled_a = true);
void add_block_observer ();
virtual void accept ();
void stop ();
void observer_action (nano::account const &);

View file

@ -316,6 +316,9 @@ std::string nano::stat::type_to_string (uint32_t key)
std::string res;
switch (type)
{
case nano::stat::type::ipc:
res = "ipc";
break;
case nano::stat::type::block:
res = "block";
break;
@ -413,6 +416,9 @@ std::string nano::stat::detail_to_string (uint32_t key)
case nano::stat::detail::insufficient_work:
res = "insufficient_work";
break;
case nano::stat::detail::invocations:
res = "invocations";
break;
case nano::stat::detail::keepalive:
res = "keepalive";
break;

View file

@ -188,6 +188,7 @@ public:
vote,
http_callback,
peering,
ipc,
udp
};
@ -250,6 +251,9 @@ public:
invalid_node_id_handshake_message,
outdated_version,
// ipc
invocations,
// peering
handshake,
};