Refactor preparation for moving RPC out of process (#1872)

* Move things around preparing for RPC out of process

* Add rpc_test executable for testing rpc commands

* Add rpc_test to ci

* Formatting
This commit is contained in:
Wesley Shillingford 2019-03-28 08:18:52 +00:00 committed by GitHub
commit 7e405d94d3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
42 changed files with 10955 additions and 873 deletions

2
.gitignore vendored
View file

@ -29,6 +29,8 @@
*.DS_Store
core_test
!core_test/
rpc_test
!rpc_test/
qt_test
nano_node
nano_wallet

View file

@ -292,6 +292,7 @@ add_subdirectory(nano/crypto_lib)
add_subdirectory(nano/secure)
add_subdirectory(nano/lib)
add_subdirectory(nano/node)
add_subdirectory(nano/rpc)
add_subdirectory(nano/nano_node)
if (NANO_TEST OR RAIBLOCKS_TEST)
@ -315,6 +316,7 @@ if (NANO_TEST OR RAIBLOCKS_TEST)
"${CMAKE_SOURCE_DIR}/gtest/googletest/include")
add_subdirectory(nano/core_test)
add_subdirectory(nano/rpc_test)
add_subdirectory(nano/slow_test)
endif ()

View file

@ -65,6 +65,9 @@ run_tests() {
fi
done
xvfb_run_ ./rpc_test
rpc_test_res=${?}
xvfb_run_ ./qt_test
qt_test_res=${?}
@ -72,6 +75,7 @@ run_tests() {
load_test_res=${?}
echo "Core Test return code: ${core_test_res}"
echo "RPC Test return code: ${rpc_test_res}"
echo "QT Test return code: ${qt_test_res}"
echo "Load Test return code: ${load_test_res}"
return ${core_test_res}

View file

@ -3,9 +3,10 @@
#include <gtest/gtest.h>
#include <memory>
#include <nano/core_test/testutil.hpp>
#include <nano/lib/ipc_client.hpp>
#include <nano/node/ipc.hpp>
#include <nano/node/rpc.hpp>
#include <nano/node/testing.hpp>
#include <nano/rpc/rpc.hpp>
#include <sstream>
#include <vector>
@ -20,7 +21,7 @@ TEST (ipc, asynchronous)
nano::ipc::ipc_server ipc (*system.nodes[0], rpc);
nano::ipc::ipc_client client (system.nodes[0]->io_ctx);
auto req (client.prepare_request (nano::ipc::payload_encoding::json_legacy, std::string (R"({"action": "block_count"})")));
auto req (nano::ipc::prepare_request (nano::ipc::payload_encoding::json_legacy, std::string (R"({"action": "block_count"})")));
auto res (std::make_shared<std::vector<uint8_t>> ());
std::atomic<bool> call_completed{ false };
client.async_connect ("::1", 24077, [&client, &req, &res, &call_completed](nano::error err) {
@ -61,13 +62,13 @@ TEST (ipc, synchronous)
system.nodes[0]->config.ipc_config.transport_tcp.enabled = true;
system.nodes[0]->config.ipc_config.transport_tcp.port = 24077;
nano::ipc::ipc_server ipc (*system.nodes[0], rpc);
nano::ipc::rpc_ipc_client client (system.nodes[0]->io_ctx);
nano::ipc::ipc_client client (system.nodes[0]->io_ctx);
// Start blocking IPC client in a separate thread
std::atomic<bool> call_completed{ false };
std::thread client_thread ([&client, &call_completed]() {
client.connect ("::1", 24077);
std::string response (client.request (std::string (R"({"action": "block_count"})")));
std::string response (nano::ipc::request (client, std::string (R"({"action": "block_count"})")));
std::stringstream ss;
ss << response;
// Make sure the response is valid json

View file

@ -8,8 +8,8 @@
#include <nano/core_test/testutil.hpp>
#include <nano/lib/jsonconfig.hpp>
#include <nano/node/common.hpp>
#include <nano/node/rpc.hpp>
#include <nano/node/testing.hpp>
#include <nano/rpc/rpc.hpp>
using namespace std::chrono_literals;

View file

@ -15,20 +15,27 @@ add_library (nano_lib
errors.hpp
errors.cpp
expected.hpp
blockbuilders.cpp
blockbuilders.hpp
blocks.cpp
blockbuilders.cpp
blocks.hpp
blocks.cpp
config.hpp
config.cpp
interface.cpp
interface.h
interface.cpp
ipc.hpp
ipc.cpp
ipc_client.hpp
ipc_client.cpp
jsonconfig.hpp
numbers.cpp
logger_mt.hpp
rpcconfig.hpp
rpcconfig.cpp
numbers.hpp
numbers.cpp
timer.hpp
utility.cpp
utility.hpp
utility.cpp
work.hpp
work.cpp)

View file

@ -7,6 +7,15 @@
#include <nano/lib/numbers.hpp>
#include <string>
#define xstr(a) ver_str (a)
#define ver_str(a) #a
/**
* Returns build version information
*/
static const char * NANO_MAJOR_MINOR_VERSION = xstr (NANO_VERSION_MAJOR) "." xstr (NANO_VERSION_MINOR);
static const char * NANO_MAJOR_MINOR_RC_VERSION = xstr (NANO_VERSION_MAJOR) "." xstr (NANO_VERSION_MINOR) "RC" xstr (NANO_VERSION_PATCH);
namespace nano
{
/**

43
nano/lib/ipc.cpp Normal file
View file

@ -0,0 +1,43 @@
#include <nano/lib/ipc.hpp>
nano::ipc::socket_base::socket_base (boost::asio::io_context & io_ctx_a) :
io_timer (io_ctx_a)
{
}
void nano::ipc::socket_base::timer_start (std::chrono::seconds timeout_a)
{
if (timeout_a < std::chrono::seconds::max ())
{
io_timer.expires_from_now (boost::posix_time::seconds (static_cast<long> (timeout_a.count ())));
io_timer.async_wait ([this](const boost::system::error_code & ec) {
if (!ec)
{
this->timer_expired ();
}
});
}
}
void nano::ipc::socket_base::timer_expired ()
{
close ();
}
void nano::ipc::socket_base::timer_cancel ()
{
boost::system::error_code ec;
io_timer.cancel (ec);
assert (!ec);
}
nano::ipc::dsock_file_remover::dsock_file_remover (std::string const & file_a) :
filename (file_a)
{
std::remove (filename.c_str ());
}
nano::ipc::dsock_file_remover::~dsock_file_remover ()
{
std::remove (filename.c_str ());
}

83
nano/lib/ipc.hpp Normal file
View file

@ -0,0 +1,83 @@
#pragma once
#include <atomic>
#include <boost/asio.hpp>
#include <boost/property_tree/ptree.hpp>
#include <string>
namespace nano
{
namespace ipc
{
/**
* The IPC framing format is simple: preamble followed by an encoding specific payload.
* Preamble is uint8_t {'N', encoding_type, reserved, reserved}. Reserved bytes MUST be zero.
* @note This is intentionally not an enum class as the values are only used as vector indices.
*/
enum preamble_offset
{
/** Always 'N' */
lead = 0,
/** One of the payload_encoding values */
encoding = 1,
/** Always zero */
reserved_1 = 2,
/** Always zero */
reserved_2 = 3,
};
/** Abstract base type for sockets, implementing timer logic and a close operation */
class socket_base
{
public:
socket_base (boost::asio::io_context & io_ctx_a);
virtual ~socket_base () = default;
/** Close socket */
virtual void close () = 0;
/**
* Start IO timer.
* @param timeout_a Seconds to wait. To wait indefinitely, use std::chrono::seconds::max ()
*/
void timer_start (std::chrono::seconds timeout_a);
void timer_expired ();
void timer_cancel ();
private:
/** IO operation timer */
boost::asio::deadline_timer io_timer;
};
/**
* Payload encodings; add protobuf, flatbuffers and so on as needed.
*/
enum class payload_encoding : uint8_t
{
/**
* Request is preamble followed by 32-bit BE payload length and payload bytes.
* Response is 32-bit BE payload length followed by payload bytes.
*/
json_legacy = 1
};
/** IPC transport interface */
class transport
{
public:
virtual void stop () = 0;
virtual ~transport () = default;
};
/** The domain socket file is attempted to be removed at both startup and shutdown. */
class dsock_file_remover final
{
public:
dsock_file_remover (std::string const & file_a);
~dsock_file_remover ();
private:
std::string filename;
};
}
}

227
nano/lib/ipc_client.cpp Normal file
View file

@ -0,0 +1,227 @@
#include <boost/endian/conversion.hpp>
#include <boost/polymorphic_cast.hpp>
#include <nano/lib/ipc.hpp>
#include <nano/lib/ipc_client.hpp>
namespace
{
/** Socket agnostic IO interface */
class channel
{
public:
virtual void async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, size_t size_a, std::function<void(boost::system::error_code const &, size_t)> callback_a) = 0;
virtual void async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(boost::system::error_code const &, size_t)> callback_a) = 0;
};
/** Domain and TCP client socket */
template <typename SOCKET_TYPE, typename ENDPOINT_TYPE>
class socket_client : public nano::ipc::socket_base, public channel
{
public:
socket_client (boost::asio::io_context & io_ctx_a, ENDPOINT_TYPE endpoint_a) :
socket_base (io_ctx_a), endpoint (endpoint_a), socket (io_ctx_a), resolver (io_ctx_a)
{
}
void async_resolve (std::string const & host_a, uint16_t port_a, std::function<void(boost::system::error_code const &, boost::asio::ip::tcp::endpoint)> callback_a)
{
this->timer_start (io_timeout);
resolver.async_resolve (boost::asio::ip::tcp::resolver::query (host_a, std::to_string (port_a)), [this, callback_a](boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator endpoint_iterator_a) {
this->timer_cancel ();
boost::asio::ip::tcp::resolver::iterator end;
if (!ec && endpoint_iterator_a != end)
{
endpoint = *endpoint_iterator_a;
callback_a (ec, *endpoint_iterator_a);
}
else
{
callback_a (ec, *end);
}
});
}
void async_connect (std::function<void(boost::system::error_code const &)> callback_a)
{
this->timer_start (io_timeout);
socket.async_connect (endpoint, [this, callback_a](boost::system::error_code const & ec) {
this->timer_cancel ();
callback_a (ec);
});
}
void async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, size_t size_a, std::function<void(boost::system::error_code const &, size_t)> callback_a) override
{
this->timer_start (io_timeout);
buffer_a->resize (size_a);
boost::asio::async_read (socket, boost::asio::buffer (buffer_a->data (), size_a), [this, callback_a](boost::system::error_code const & ec, size_t size_a) {
this->timer_cancel ();
callback_a (ec, size_a);
});
}
void async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(boost::system::error_code const &, size_t)> callback_a) override
{
this->timer_start (io_timeout);
boost::asio::async_write (socket, boost::asio::buffer (buffer_a->data (), buffer_a->size ()), [this, callback_a, buffer_a](boost::system::error_code const & ec, size_t size_a) {
this->timer_cancel ();
callback_a (ec, size_a);
});
}
/** Shut down and close socket */
void close () override
{
socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both);
socket.close ();
}
private:
ENDPOINT_TYPE endpoint;
SOCKET_TYPE socket;
boost::asio::ip::tcp::resolver resolver;
std::chrono::seconds io_timeout{ 60 };
};
/**
* PIMPL class for ipc_client. This ensures that socket_client and boost details can
* stay out of the header file.
*/
class client_impl : public nano::ipc::ipc_client_impl
{
public:
client_impl (boost::asio::io_context & io_ctx_a) :
io_ctx (io_ctx_a)
{
}
void connect (std::string const & host_a, uint16_t port_a, std::function<void(nano::error)> callback_a)
{
tcp_client = std::make_shared<socket_client<boost::asio::ip::tcp::socket, boost::asio::ip::tcp::endpoint>> (io_ctx, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v6 (), port_a));
tcp_client->async_resolve (host_a, port_a, [this, callback_a](boost::system::error_code const & ec_resolve_a, boost::asio::ip::tcp::endpoint endpoint_a) {
if (!ec_resolve_a)
{
this->tcp_client->async_connect ([callback_a](const boost::system::error_code & ec_connect_a) {
callback_a (nano::error (ec_connect_a));
});
}
else
{
callback_a (nano::error (ec_resolve_a));
}
});
}
nano::error connect (std::string const & path_a)
{
nano::error err;
#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
domain_client = std::make_shared<socket_client<boost::asio::local::stream_protocol::socket, boost::asio::local::stream_protocol::endpoint>> (io_ctx, boost::asio::local::stream_protocol::endpoint (path_a));
#else
err = nano::error ("Domain sockets are not supported by this platform");
#endif
return err;
}
channel & get_channel ()
{
#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
return tcp_client ? static_cast<channel &> (*tcp_client) : static_cast<channel &> (*domain_client);
#else
return static_cast<channel &> (*tcp_client);
#endif
}
private:
boost::asio::io_context & io_ctx;
std::shared_ptr<socket_client<boost::asio::ip::tcp::socket, boost::asio::ip::tcp::endpoint>> tcp_client;
#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
std::shared_ptr<socket_client<boost::asio::local::stream_protocol::socket, boost::asio::local::stream_protocol::endpoint>> domain_client;
#endif
};
}
nano::ipc::ipc_client::ipc_client (boost::asio::io_context & io_ctx_a) :
io_ctx (io_ctx_a)
{
}
nano::error nano::ipc::ipc_client::connect (std::string const & path_a)
{
impl = std::make_unique<client_impl> (io_ctx);
return boost::polymorphic_downcast<client_impl *> (impl.get ())->connect (path_a);
}
void nano::ipc::ipc_client::async_connect (std::string const & host_a, uint16_t port_a, std::function<void(nano::error)> callback_a)
{
impl = std::make_unique<client_impl> (io_ctx);
auto client (boost::polymorphic_downcast<client_impl *> (impl.get ()));
client->connect (host_a, port_a, callback_a);
}
nano::error nano::ipc::ipc_client::connect (std::string const & host, uint16_t port)
{
std::promise<nano::error> result_l;
async_connect (host, port, [&result_l](nano::error err_a) {
result_l.set_value (err_a);
});
return result_l.get_future ().get ();
}
void nano::ipc::ipc_client::async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(nano::error, size_t)> callback_a)
{
auto client (boost::polymorphic_downcast<client_impl *> (impl.get ()));
client->get_channel ().async_write (buffer_a, [callback_a](const boost::system::error_code & ec_a, size_t bytes_written_a) {
callback_a (nano::error (ec_a), bytes_written_a);
});
}
void nano::ipc::ipc_client::async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, size_t size_a, std::function<void(nano::error, size_t)> callback_a)
{
auto client (boost::polymorphic_downcast<client_impl *> (impl.get ()));
client->get_channel ().async_read (buffer_a, size_a, [callback_a](const boost::system::error_code & ec_a, size_t bytes_read_a) {
callback_a (nano::error (ec_a), bytes_read_a);
});
}
std::shared_ptr<std::vector<uint8_t>> nano::ipc::prepare_request (nano::ipc::payload_encoding encoding_a, std::string const & payload_a)
{
auto buffer_l (std::make_shared<std::vector<uint8_t>> ());
if (encoding_a == nano::ipc::payload_encoding::json_legacy)
{
buffer_l->push_back ('N');
buffer_l->push_back (static_cast<uint8_t> (encoding_a));
buffer_l->push_back (0);
buffer_l->push_back (0);
auto payload_length = static_cast<uint32_t> (payload_a.size ());
uint32_t be = boost::endian::native_to_big (payload_length);
char * chars = reinterpret_cast<char *> (&be);
buffer_l->insert (buffer_l->end (), chars, chars + sizeof (uint32_t));
buffer_l->insert (buffer_l->end (), payload_a.begin (), payload_a.end ());
}
return buffer_l;
}
std::string nano::ipc::request (nano::ipc::ipc_client & ipc_client, std::string const & rpc_action_a)
{
auto req (prepare_request (nano::ipc::payload_encoding::json_legacy, rpc_action_a));
auto res (std::make_shared<std::vector<uint8_t>> ());
std::promise<std::string> result_l;
// clang-format off
ipc_client.async_write (req, [&ipc_client, &res, &result_l](nano::error err_a, size_t size_a) {
// Read length
ipc_client.async_read (res, sizeof (uint32_t), [&ipc_client, &res, &result_l](nano::error err_read_a, size_t size_read_a) {
uint32_t payload_size_l = boost::endian::big_to_native (*reinterpret_cast<uint32_t *> (res->data ()));
// Read json payload
ipc_client.async_read (res, payload_size_l, [&res, &result_l](nano::error err_read_a, size_t size_read_a) {
result_l.set_value (std::string (res->begin (), res->end ()));
});
});
});
// clang-format on
return result_l.get_future ().get ();
}

60
nano/lib/ipc_client.hpp Normal file
View file

@ -0,0 +1,60 @@
#pragma once
#include <atomic>
#include <boost/asio.hpp>
#include <boost/property_tree/ptree.hpp>
#include <nano/lib/errors.hpp>
#include <nano/lib/ipc.hpp>
#include <string>
#include <vector>
namespace nano
{
namespace ipc
{
class ipc_client_impl
{
public:
virtual ~ipc_client_impl () = default;
};
/** IPC client */
class ipc_client
{
public:
ipc_client (boost::asio::io_context & io_ctx_a);
ipc_client (ipc_client && ipc_client) = default;
virtual ~ipc_client () = default;
/** Connect to a domain socket */
nano::error connect (std::string const & path);
/** Connect to a tcp socket synchronously */
nano::error connect (std::string const & host, uint16_t port);
/** Connect to a tcp socket asynchronously */
void async_connect (std::string const & host, uint16_t port, std::function<void(nano::error)> callback);
/** Write buffer asynchronously */
void async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(nano::error, size_t)> callback_a);
/** Read \p size_a bytes asynchronously */
void async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, size_t size_a, std::function<void(nano::error, size_t)> callback_a);
private:
boost::asio::io_context & io_ctx;
// PIMPL pattern to hide implementation details
std::unique_ptr<ipc_client_impl> impl;
};
/** Convenience function for making synchronous IPC calls. The client must be connected */
std::string request (nano::ipc::ipc_client & ipc_client, std::string const & rpc_action_a);
/**
* Returns a buffer with an IPC preamble for the given \p encoding_a followed by the payload. Depending on encoding,
* the buffer may contain a payload length or end sentinel.
*/
std::shared_ptr<std::vector<uint8_t>> prepare_request (nano::ipc::payload_encoding encoding_a, std::string const & payload_a);
}
}

View file

@ -57,11 +57,11 @@ public:
}
/**
* Reads a json object from the stream and if it was changed, write the object back to the stream.
* Reads a json object from the stream
* @return nano::error&, including a descriptive error message if the config file is malformed.
*/
template <typename T>
nano::error & read_and_update (T & object, boost::filesystem::path const & path_a)
nano::error & read (boost::filesystem::path const & path_a)
{
std::fstream stream;
open_or_create (stream, path_a.string ());
@ -80,23 +80,35 @@ public:
}
}
stream.close ();
if (!*error)
}
return *error;
}
/**
* Reads a json object from the stream and if it was changed, write the object back to the stream.
* @return nano::error&, including a descriptive error message if the config file is malformed.
*/
template <typename T>
nano::error & read_and_update (T & object, boost::filesystem::path const & path_a)
{
read<T> (path_a);
if (!*error)
{
std::fstream stream;
auto updated (false);
*error = object.deserialize_json (updated, *this);
if (!*error && updated)
{
auto updated (false);
*error = object.deserialize_json (updated, *this);
if (!*error && updated)
stream.open (path_a.string (), std::ios_base::out | std::ios_base::trunc);
try
{
stream.open (path_a.string (), std::ios_base::out | std::ios_base::trunc);
try
{
boost::property_tree::write_json (stream, tree);
}
catch (std::runtime_error const & ex)
{
*error = ex;
}
stream.close ();
boost::property_tree::write_json (stream, tree);
}
catch (std::runtime_error const & ex)
{
*error = ex;
}
stream.close ();
}
}
return *error;

80
nano/lib/logger_mt.hpp Normal file
View file

@ -0,0 +1,80 @@
#pragma once
#include <boost/log/sources/logger.hpp>
#include <boost/log/trivial.hpp>
#include <chrono>
namespace nano
{
// A wrapper around a boost logger object to allow minimum
// time spaced output to prevent logging happening too quickly.
class logger_mt
{
private:
void add_to_stream (boost::log::record_ostream & stream)
{
}
template <typename LogItem, typename... LogItems>
void add_to_stream (boost::log::record_ostream & stream, const LogItem & first_log_item, LogItems &&... remainder_log_items)
{
stream << first_log_item;
add_to_stream (stream, remainder_log_items...);
}
template <typename... LogItems>
void output (LogItems &&... log_items)
{
boost::log::record rec = boost_logger_mt.open_record ();
if (rec)
{
boost::log::record_ostream strm (rec);
add_to_stream (strm, std::forward<LogItems> (log_items)...);
strm.flush ();
boost_logger_mt.push_record (std::move (rec));
}
}
public:
/**
* @param min_log_delta_a The minimum time between successive output
*/
explicit logger_mt (std::chrono::milliseconds const & min_log_delta_a) :
min_log_delta (min_log_delta_a)
{
}
/*
* @param log_items A collection of objects with overloaded operator<< to be output to the log file
*/
template <typename... LogItems>
void always_log (LogItems &&... log_items)
{
output (std::forward<LogItems> (log_items)...);
}
/*
* @param log_items Output to the log file if the last write was over min_log_delta time ago.
* @return true if the log was successful
*/
template <typename... LogItems>
bool try_log (LogItems &&... log_items)
{
auto error (true);
auto time_now = std::chrono::steady_clock::now ();
if (((time_now - last_log_time) > min_log_delta) || last_log_time == std::chrono::steady_clock::time_point{})
{
output (std::forward<LogItems> (log_items)...);
last_log_time = time_now;
error = false;
}
return error;
}
std::chrono::milliseconds min_log_delta;
private:
std::chrono::steady_clock::time_point last_log_time;
boost::log::sources::logger_mt boost_logger_mt;
};
}

View file

@ -1,12 +1,6 @@
#include <nano/lib/config.hpp>
#include <nano/lib/jsonconfig.hpp>
#include <nano/node/rpcconfig.hpp>
nano::rpc_secure_config::rpc_secure_config () :
enable (false),
verbose_logging (false)
{
}
#include <nano/lib/rpcconfig.hpp>
nano::error nano::rpc_secure_config::serialize_json (nano::jsonconfig & json) const
{

View file

@ -10,17 +10,16 @@ namespace nano
class jsonconfig;
/** Configuration options for RPC TLS */
class rpc_secure_config
class rpc_secure_config final
{
public:
rpc_secure_config ();
nano::error serialize_json (nano::jsonconfig &) const;
nano::error deserialize_json (nano::jsonconfig &);
/** If true, enable TLS */
bool enable;
bool enable{ false };
/** If true, log certificate verification details */
bool verbose_logging;
bool verbose_logging{ false };
/** Must be set if the private key PEM is password protected */
std::string server_key_passphrase;
/** Path to certificate- or chain file. Must be PEM formatted. */
@ -33,10 +32,10 @@ public:
std::string client_certs_path;
};
class rpc_config
class rpc_config final
{
public:
rpc_config (bool = false);
explicit rpc_config (bool = false);
nano::error serialize_json (nano::jsonconfig &) const;
nano::error deserialize_json (bool & upgraded_a, nano::jsonconfig &);
nano::network_constants network_constants;

View file

@ -130,6 +130,49 @@ void nano::thread_attributes::set (boost::thread::attributes & attrs)
attrs_l->set_stack_size (8000000); //8MB
}
nano::thread_runner::thread_runner (boost::asio::io_context & io_ctx_a, unsigned service_threads_a)
{
boost::thread::attributes attrs;
nano::thread_attributes::set (attrs);
for (auto i (0u); i < service_threads_a; ++i)
{
threads.push_back (boost::thread (attrs, [&io_ctx_a]() {
nano::thread_role::set (nano::thread_role::name::io);
try
{
io_ctx_a.run ();
}
catch (...)
{
#ifndef NDEBUG
/*
* In a release build, catch and swallow the
* io_context exception, in debug mode pass it
* on
*/
throw;
#endif
}
}));
}
}
nano::thread_runner::~thread_runner ()
{
join ();
}
void nano::thread_runner::join ()
{
for (auto & i : threads)
{
if (i.joinable ())
{
i.join ();
}
}
}
/*
* Backing code for "release_assert", which is itself a macro
*/

View file

@ -1,5 +1,6 @@
#pragma once
#include <boost/asio.hpp>
#include <boost/filesystem.hpp>
#include <boost/system/error_code.hpp>
#include <boost/thread/thread.hpp>
@ -108,6 +109,15 @@ namespace thread_attributes
void set (boost::thread::attributes &);
}
class thread_runner final
{
public:
thread_runner (boost::asio::io_context &, unsigned);
~thread_runner ();
void join ();
std::vector<boost::thread> threads;
};
template <typename... T>
class observer_set final
{

View file

@ -3,8 +3,9 @@
#include <nano/nano_node/daemon.hpp>
#include <nano/node/cli.hpp>
#include <nano/node/node.hpp>
#include <nano/node/rpc.hpp>
#include <nano/node/testing.hpp>
#include <nano/rpc/rpc.hpp>
#include <nano/rpc/rpc_handler.hpp>
#include <sstream>
#include <argon2.h>

View file

@ -5,9 +5,9 @@
#include <nano/nano_wallet/icon.hpp>
#include <nano/node/cli.hpp>
#include <nano/node/ipc.hpp>
#include <nano/node/rpc.hpp>
#include <nano/node/working.hpp>
#include <nano/qt/qt.hpp>
#include <nano/rpc/rpc.hpp>
#include <boost/make_shared.hpp>
#include <boost/program_options.hpp>

View file

@ -1,8 +1,8 @@
#include <nano/lib/errors.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/cli.hpp>
#include <nano/node/rpc.hpp>
#include <nano/node/working.hpp>
#include <nano/rpc/rpc.hpp>
#include <boost/make_shared.hpp>
#include <boost/program_options.hpp>

View file

@ -1,7 +1,3 @@
if (NANO_SECURE_RPC OR RAIBLOCKS_SECURE_RPC)
set (secure_rpc_sources rpc_secure.cpp rpc_secure.hpp)
endif ()
if (${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
# No opencl
elseif (${CMAKE_SYSTEM_NAME} MATCHES "Windows")
@ -45,10 +41,6 @@ add_library (node
portmapping.cpp
repcrawler.hpp
repcrawler.cpp
rpc.hpp
rpc.cpp
rpcconfig.hpp
rpcconfig.cpp
testing.hpp
testing.cpp
transport/tcp.cpp
@ -75,6 +67,7 @@ add_library (node
target_link_libraries (node
secure
nano_lib
rpc
libminiupnpc-static
argon2
lmdb

View file

@ -2,7 +2,7 @@
#include <nano/lib/errors.hpp>
#include <nano/node/node.hpp>
#include <nano/node/rpc.hpp>
#include <nano/rpc/rpc.hpp>
namespace nano
{

View file

@ -11,87 +11,25 @@
#include <fstream>
#include <future>
#include <iostream>
#include <nano/lib/config.hpp>
#include <nano/lib/ipc.hpp>
#include <nano/lib/timer.hpp>
#include <nano/node/common.hpp>
#include <nano/node/ipc.hpp>
#include <nano/node/node.hpp>
#include <nano/node/rpc.hpp>
#include <nano/rpc/rpc.hpp>
#include <nano/rpc/rpc_handler.hpp>
#include <thread>
using namespace boost::log;
namespace
{
/**
* The IPC framing format is simple: preamble followed by an encoding specific payload.
* Preamble is uint8_t {'N', encoding_type, reserved, reserved}. Reserved bytes MUST be zero.
* @note This is intentionally not an enum class as the values are only used as vector indices.
*/
enum preamble_offset
{
/** Always 'N' */
lead = 0,
/** One of the payload_encoding values */
encoding = 1,
/** Always zero */
reserved_1 = 2,
/** Always zero */
reserved_2 = 3,
};
}
/** Abstract base type for sockets, implementing timer logic and a close operation */
class socket_base
{
public:
socket_base (boost::asio::io_context & io_ctx_a) :
io_timer (io_ctx_a)
{
}
virtual ~socket_base () = default;
/** Close socket */
virtual void close () = 0;
/**
* Start IO timer.
* @param timeout_a Seconds to wait. To wait indefinitely, use std::chrono::seconds::max ()
*/
void timer_start (std::chrono::seconds timeout_a)
{
if (timeout_a < std::chrono::seconds::max ())
{
io_timer.expires_from_now (boost::posix_time::seconds (static_cast<long> (timeout_a.count ())));
io_timer.async_wait ([this](const boost::system::error_code & ec) {
if (!ec)
{
this->timer_expired ();
}
});
}
}
void timer_expired ()
{
close ();
}
void timer_cancel ()
{
boost::system::error_code ec;
io_timer.cancel (ec);
assert (!ec);
}
private:
/** IO operation timer */
boost::asio::deadline_timer io_timer;
};
/**
* A session represents an inbound connection over which multiple requests/reponses are transmitted.
*/
template <typename SOCKET_TYPE>
class session : public socket_base, public std::enable_shared_from_this<session<SOCKET_TYPE>>
class session : public nano::ipc::socket_base, public std::enable_shared_from_this<session<SOCKET_TYPE>>
{
public:
session (nano::ipc::ipc_server & server_a, boost::asio::io_context & io_ctx_a, nano::ipc::ipc_config_transport & config_transport_a) :
@ -202,14 +140,14 @@ public:
// Await next request indefinitely
buffer.resize (sizeof (buffer_size));
async_read_exactly (buffer.data (), buffer.size (), std::chrono::seconds::max (), [this_l]() {
if (this_l->buffer[preamble_offset::lead] != 'N' || this_l->buffer[preamble_offset::reserved_1] != 0 || this_l->buffer[preamble_offset::reserved_2] != 0)
if (this_l->buffer[nano::ipc::preamble_offset::lead] != 'N' || this_l->buffer[nano::ipc::preamble_offset::reserved_1] != 0 || this_l->buffer[nano::ipc::preamble_offset::reserved_2] != 0)
{
if (this_l->node.config.logging.log_ipc ())
{
this_l->node.logger.always_log ("IPC: Invalid preamble");
}
}
else if (this_l->buffer[preamble_offset::encoding] == static_cast<uint8_t> (nano::ipc::payload_encoding::json_legacy))
else if (this_l->buffer[nano::ipc::preamble_offset::encoding] == static_cast<uint8_t> (nano::ipc::payload_encoding::json_legacy))
{
// Length of payload
this_l->async_read_exactly (&this_l->buffer_size, sizeof (this_l->buffer_size), [this_l]() {
@ -349,22 +287,7 @@ private:
std::unique_ptr<boost::asio::io_context> io_ctx;
std::unique_ptr<ACCEPTOR_TYPE> acceptor;
};
/** The domain socket file is attempted removed at both startup and shutdown. */
class nano::ipc::dsock_file_remover final
{
public:
dsock_file_remover (std::string const & file_a) :
filename (file_a)
{
std::remove (filename.c_str ());
}
~dsock_file_remover ()
{
std::remove (filename.c_str ());
}
std::string filename;
};
}
nano::ipc::ipc_server::ipc_server (nano::node & node_a, nano::rpc & rpc_a) :
node (node_a), rpc (rpc_a)
@ -409,221 +332,3 @@ void nano::ipc::ipc_server::stop ()
transport->stop ();
}
}
/** Socket agnostic IO interface */
class channel
{
public:
virtual void async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, size_t size_a, std::function<void(boost::system::error_code const &, size_t)> callback_a) = 0;
virtual void async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(boost::system::error_code const &, size_t)> callback_a) = 0;
};
/** Domain and TCP client socket */
template <typename SOCKET_TYPE, typename ENDPOINT_TYPE>
class socket_client : public socket_base, public channel
{
public:
socket_client (boost::asio::io_context & io_ctx_a, ENDPOINT_TYPE endpoint_a) :
socket_base (io_ctx_a), endpoint (endpoint_a), socket (io_ctx_a), resolver (io_ctx_a)
{
}
void async_resolve (std::string const & host_a, uint16_t port_a, std::function<void(boost::system::error_code const &, boost::asio::ip::tcp::endpoint)> callback_a)
{
this->timer_start (io_timeout);
resolver.async_resolve (boost::asio::ip::tcp::resolver::query (host_a, std::to_string (port_a)), [this, callback_a](boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator endpoint_iterator_a) {
this->timer_cancel ();
boost::asio::ip::tcp::resolver::iterator end;
if (!ec && endpoint_iterator_a != end)
{
endpoint = *endpoint_iterator_a;
callback_a (ec, *endpoint_iterator_a);
}
else
{
callback_a (ec, *end);
}
});
}
void async_connect (std::function<void(boost::system::error_code const &)> callback_a)
{
this->timer_start (io_timeout);
socket.async_connect (endpoint, [this, callback_a](boost::system::error_code const & ec) {
this->timer_cancel ();
callback_a (ec);
});
}
void async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, size_t size_a, std::function<void(boost::system::error_code const &, size_t)> callback_a) override
{
this->timer_start (io_timeout);
buffer_a->resize (size_a);
boost::asio::async_read (socket, boost::asio::buffer (buffer_a->data (), size_a), [this, callback_a](boost::system::error_code const & ec, size_t size_a) {
this->timer_cancel ();
callback_a (ec, size_a);
});
}
void async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(boost::system::error_code const &, size_t)> callback_a) override
{
this->timer_start (io_timeout);
boost::asio::async_write (socket, boost::asio::buffer (buffer_a->data (), buffer_a->size ()), [this, callback_a, buffer_a](boost::system::error_code const & ec, size_t size_a) {
this->timer_cancel ();
callback_a (ec, size_a);
});
}
/** Shut down and close socket */
void close () override
{
socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both);
socket.close ();
}
private:
ENDPOINT_TYPE endpoint;
SOCKET_TYPE socket;
boost::asio::ip::tcp::resolver resolver;
std::chrono::seconds io_timeout{ 60 };
};
/**
* PIMPL class for ipc_client. This ensures that socket_client and boost details can
* stay out of the header file.
*/
class client_impl : public nano::ipc::ipc_client_impl
{
public:
client_impl (boost::asio::io_context & io_ctx_a) :
io_ctx (io_ctx_a)
{
}
void connect (std::string const & host_a, uint16_t port_a, std::function<void(nano::error)> callback_a)
{
tcp_client = std::make_shared<socket_client<boost::asio::ip::tcp::socket, boost::asio::ip::tcp::endpoint>> (io_ctx, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v6 (), port_a));
tcp_client->async_resolve (host_a, port_a, [this, callback_a](boost::system::error_code const & ec_resolve_a, boost::asio::ip::tcp::endpoint endpoint_a) {
if (!ec_resolve_a)
{
this->tcp_client->async_connect ([callback_a](const boost::system::error_code & ec_connect_a) {
callback_a (nano::error (ec_connect_a));
});
}
else
{
callback_a (nano::error (ec_resolve_a));
}
});
}
nano::error connect (std::string const & path_a)
{
nano::error err;
#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
domain_client = std::make_shared<socket_client<boost::asio::local::stream_protocol::socket, boost::asio::local::stream_protocol::endpoint>> (io_ctx, boost::asio::local::stream_protocol::endpoint (path_a));
#else
err = nano::error ("Domain sockets are not supported by this platform");
#endif
return err;
}
channel & get_channel ()
{
#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
return tcp_client ? static_cast<channel &> (*tcp_client) : static_cast<channel &> (*domain_client);
#else
return static_cast<channel &> (*tcp_client);
#endif
}
private:
boost::asio::io_context & io_ctx;
std::shared_ptr<socket_client<boost::asio::ip::tcp::socket, boost::asio::ip::tcp::endpoint>> tcp_client;
#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
std::shared_ptr<socket_client<boost::asio::local::stream_protocol::socket, boost::asio::local::stream_protocol::endpoint>> domain_client;
#endif
};
nano::ipc::ipc_client::ipc_client (boost::asio::io_context & io_ctx_a) :
io_ctx (io_ctx_a)
{
}
nano::error nano::ipc::ipc_client::connect (std::string const & path_a)
{
impl = std::make_unique<client_impl> (io_ctx);
return boost::polymorphic_downcast<client_impl *> (impl.get ())->connect (path_a);
}
void nano::ipc::ipc_client::async_connect (std::string const & host_a, uint16_t port_a, std::function<void(nano::error)> callback_a)
{
impl = std::make_unique<client_impl> (io_ctx);
auto client (boost::polymorphic_downcast<client_impl *> (impl.get ()));
client->connect (host_a, port_a, callback_a);
}
nano::error nano::ipc::ipc_client::connect (std::string const & host, uint16_t port)
{
std::promise<nano::error> result_l;
async_connect (host, port, [&result_l](nano::error err_a) {
result_l.set_value (err_a);
});
return result_l.get_future ().get ();
}
void nano::ipc::ipc_client::async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(nano::error, size_t)> callback_a)
{
auto client (boost::polymorphic_downcast<client_impl *> (impl.get ()));
client->get_channel ().async_write (buffer_a, [callback_a](const boost::system::error_code & ec_a, size_t bytes_written_a) {
callback_a (nano::error (ec_a), bytes_written_a);
});
}
void nano::ipc::ipc_client::async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, size_t size_a, std::function<void(nano::error, size_t)> callback_a)
{
auto client (boost::polymorphic_downcast<client_impl *> (impl.get ()));
client->get_channel ().async_read (buffer_a, size_a, [callback_a](const boost::system::error_code & ec_a, size_t bytes_read_a) {
callback_a (nano::error (ec_a), bytes_read_a);
});
}
std::shared_ptr<std::vector<uint8_t>> nano::ipc::ipc_client::prepare_request (nano::ipc::payload_encoding encoding_a, std::string const & payload_a)
{
auto buffer_l (std::make_shared<std::vector<uint8_t>> ());
if (encoding_a == nano::ipc::payload_encoding::json_legacy)
{
buffer_l->push_back ('N');
buffer_l->push_back (static_cast<uint8_t> (encoding_a));
buffer_l->push_back (0);
buffer_l->push_back (0);
auto payload_length = static_cast<uint32_t> (payload_a.size ());
uint32_t be = boost::endian::native_to_big (payload_length);
char * chars = reinterpret_cast<char *> (&be);
buffer_l->insert (buffer_l->end (), chars, chars + sizeof (uint32_t));
buffer_l->insert (buffer_l->end (), payload_a.begin (), payload_a.end ());
}
return buffer_l;
}
std::string nano::ipc::rpc_ipc_client::request (std::string const & rpc_action_a)
{
auto req (prepare_request (nano::ipc::payload_encoding::json_legacy, rpc_action_a));
auto res (std::make_shared<std::vector<uint8_t>> ());
std::promise<std::string> result_l;
async_write (req, [this, &res, &result_l](nano::error err_a, size_t size_a) {
// Read length
this->async_read (res, sizeof (uint32_t), [this, &res, &result_l](nano::error err_read_a, size_t size_read_a) {
uint32_t payload_size_l = boost::endian::big_to_native (*reinterpret_cast<uint32_t *> (res->data ()));
// Read json payload
this->async_read (res, payload_size_l, [&res, &result_l](nano::error err_read_a, size_t size_read_a) {
result_l.set_value (std::string (res->begin (), res->end ()));
});
});
});
return result_l.get_future ().get ();
}

View file

@ -1,46 +1,17 @@
#pragma once
#include <atomic>
#include <boost/asio.hpp>
#include <boost/property_tree/ptree.hpp>
#include <nano/lib/errors.hpp>
#include <nano/lib/ipc.hpp>
#include <nano/lib/jsonconfig.hpp>
#include <string>
#include <vector>
namespace nano
{
class node;
class rpc;
}
namespace nano
{
namespace ipc
{
/**
* Payload encodings; add protobuf, flatbuffers and so on as needed.
*/
enum class payload_encoding : uint8_t
{
/**
* Request is preamble followed by 32-bit BE payload length and payload bytes.
* Response is 32-bit BE payload length followed by payload bytes.
*/
json_legacy = 1
};
/** Removes domain socket files on startup and shutdown */
class dsock_file_remover;
/** IPC transport interface */
class transport
{
public:
virtual void stop () = 0;
virtual ~transport () = default;
};
/** The IPC server accepts connections on one or more configured transports */
class ipc_server
{
@ -59,58 +30,5 @@ namespace ipc
std::unique_ptr<dsock_file_remover> file_remover;
std::vector<std::shared_ptr<nano::ipc::transport>> transports;
};
class ipc_client_impl
{
public:
virtual ~ipc_client_impl () = default;
};
/** IPC client */
class ipc_client
{
public:
ipc_client (boost::asio::io_context & io_ctx_a);
virtual ~ipc_client () = default;
/** Connect to a domain socket */
nano::error connect (std::string const & path);
/** Connect to a tcp socket synchronously */
nano::error connect (std::string const & host, uint16_t port);
/** Connect to a tcp socket asynchronously */
void async_connect (std::string const & host, uint16_t port, std::function<void(nano::error)> callback);
/** Write buffer asynchronously */
void async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(nano::error, size_t)> callback_a);
/** Read \p size_a bytes asynchronously */
void async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, size_t size_a, std::function<void(nano::error, size_t)> callback_a);
/**
* Returns a buffer with an IPC preamble for the given \p encoding_a followed by the payload. Depending on encoding,
* the buffer may contain a payload length or end sentinel.
*/
std::shared_ptr<std::vector<uint8_t>> prepare_request (nano::ipc::payload_encoding encoding_a, std::string const & payload_a);
private:
boost::asio::io_context & io_ctx;
// PIMPL pattern to hide implementation details
std::unique_ptr<ipc_client_impl> impl;
};
/** Convenience wrapper for making synchronous RPC calls via IPC */
class rpc_ipc_client : public ipc_client
{
public:
rpc_ipc_client (boost::asio::io_context & io_ctx_a) :
ipc_client (io_ctx_a)
{
}
/** Calls the RPC server via IPC and waits for the result. The client must be connected. */
std::string request (std::string const & rpc_action_a);
};
}
}

View file

@ -6,6 +6,7 @@
#include <cstdint>
#include <nano/lib/errors.hpp>
#include <nano/lib/jsonconfig.hpp>
#include <nano/lib/logger_mt.hpp>
#define FATAL_LOG_PREFIX "FATAL ERROR: "
@ -13,79 +14,6 @@ using namespace std::chrono;
namespace nano
{
// A wrapper around a boost logger object to allow
// minimum time spaced output to prevent logging happening
// too quickly.
class logger_mt
{
private:
void add_to_stream (boost::log::record_ostream & stream)
{
}
template <typename LogItem, typename... LogItems>
void add_to_stream (boost::log::record_ostream & stream, const LogItem & first_log_item, LogItems &&... remainder_log_items)
{
stream << first_log_item;
add_to_stream (stream, remainder_log_items...);
}
template <typename... LogItems>
void output (LogItems &&... log_items)
{
boost::log::record rec = boost_logger_mt.open_record ();
if (rec)
{
boost::log::record_ostream strm (rec);
add_to_stream (strm, std::forward<LogItems> (log_items)...);
strm.flush ();
boost_logger_mt.push_record (std::move (rec));
}
}
public:
/**
* @param min_log_delta_a The minimum time between successive output
*/
explicit logger_mt (std::chrono::milliseconds const & min_log_delta_a) :
min_log_delta (min_log_delta_a)
{
}
/*
* @param log_items A collection of objects with overloaded operator<< to be output to the log file
*/
template <typename... LogItems>
void always_log (LogItems &&... log_items)
{
output (std::forward<LogItems> (log_items)...);
}
/*
* @param log_items Output to the log file if the last write was over min_log_delta time ago.
* @return true if the log was successful
*/
template <typename... LogItems>
bool try_log (LogItems &&... log_items)
{
auto error (true);
auto time_now = std::chrono::steady_clock::now ();
if (((time_now - last_log_time) > min_log_delta) || last_log_time == std::chrono::steady_clock::time_point{})
{
output (std::forward<LogItems> (log_items)...);
last_log_time = time_now;
error = false;
}
return error;
}
std::chrono::milliseconds min_log_delta;
private:
std::chrono::steady_clock::time_point last_log_time;
boost::log::sources::logger_mt boost_logger_mt;
};
class logging
{
public:

View file

@ -5,7 +5,7 @@
#include <nano/lib/timer.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/common.hpp>
#include <nano/node/rpc.hpp>
#include <nano/rpc/rpc.hpp>
#include <algorithm>
#include <cstdlib>
@ -3465,49 +3465,6 @@ int nano::node::store_version ()
return store.version_get (transaction);
}
nano::thread_runner::thread_runner (boost::asio::io_context & io_ctx_a, unsigned service_threads_a)
{
boost::thread::attributes attrs;
nano::thread_attributes::set (attrs);
for (auto i (0u); i < service_threads_a; ++i)
{
threads.push_back (boost::thread (attrs, [&io_ctx_a]() {
nano::thread_role::set (nano::thread_role::name::io);
try
{
io_ctx_a.run ();
}
catch (...)
{
#ifndef NDEBUG
/*
* In a release build, catch and swallow the
* io_context exception, in debug mode pass it
* on
*/
throw;
#endif
}
}));
}
}
nano::thread_runner::~thread_runner ()
{
join ();
}
void nano::thread_runner::join ()
{
for (auto & i : threads)
{
if (i.joinable ())
{
i.join ();
}
}
}
nano::inactive_node::inactive_node (boost::filesystem::path const & path_a, uint16_t peering_port_a) :
path (path_a),
io_context (std::make_shared<boost::asio::io_context> ()),

View file

@ -28,15 +28,6 @@
#include <boost/multi_index_container.hpp>
#include <boost/thread/thread.hpp>
#define xstr(a) ver_str (a)
#define ver_str(a) #a
/**
* Returns build version information
*/
static const char * NANO_MAJOR_MINOR_VERSION = xstr (NANO_VERSION_MAJOR) "." xstr (NANO_VERSION_MINOR);
static const char * NANO_MAJOR_MINOR_RC_VERSION = xstr (NANO_VERSION_MAJOR) "." xstr (NANO_VERSION_MINOR) "RC" xstr (NANO_VERSION_PATCH);
namespace nano
{
class channel;
@ -521,14 +512,6 @@ private:
std::unique_ptr<seq_con_info_component> collect_seq_con_info (node & node, const std::string & name);
class thread_runner final
{
public:
thread_runner (boost::asio::io_context &, unsigned);
~thread_runner ();
void join ();
std::vector<boost::thread> threads;
};
class inactive_node final
{
public:

View file

@ -4420,163 +4420,6 @@ void nano::rpc_handler::work_peers_clear ()
response_errors ();
}
nano::rpc_connection::rpc_connection (nano::node & node_a, nano::rpc & rpc_a) :
node (node_a.shared ()),
rpc (rpc_a),
socket (node_a.io_ctx)
{
responded.clear ();
}
void nano::rpc_connection::parse_connection ()
{
read ();
}
void nano::rpc_connection::prepare_head (unsigned version, boost::beast::http::status status)
{
res.version (version);
res.result (status);
res.set (boost::beast::http::field::allow, "POST, OPTIONS");
res.set (boost::beast::http::field::content_type, "application/json");
res.set (boost::beast::http::field::access_control_allow_origin, "*");
res.set (boost::beast::http::field::access_control_allow_methods, "POST, OPTIONS");
res.set (boost::beast::http::field::access_control_allow_headers, "Accept, Accept-Language, Content-Language, Content-Type");
res.set (boost::beast::http::field::connection, "close");
}
void nano::rpc_connection::write_result (std::string body, unsigned version, boost::beast::http::status status)
{
if (!responded.test_and_set ())
{
prepare_head (version, status);
res.body () = body;
res.prepare_payload ();
}
else
{
assert (false && "RPC already responded and should only respond once");
// Guards `res' from being clobbered while async_write is being serviced
}
}
void nano::rpc_connection::read ()
{
auto this_l (shared_from_this ());
boost::system::error_code header_error;
auto header_parser (std::make_shared<boost::beast::http::request_parser<boost::beast::http::empty_body>> ());
std::promise<size_t> header_available_promise;
std::future<size_t> header_available = header_available_promise.get_future ();
header_parser->body_limit (rpc.config.max_request_size);
if (!node->network_params.network.is_test_network ())
{
boost::beast::http::async_read_header (socket, buffer, *header_parser, [this_l, header_parser, &header_available_promise, &header_error](boost::system::error_code const & ec, size_t bytes_transferred) {
size_t header_response_bytes_written = 0;
if (!ec)
{
if (boost::iequals (header_parser->get ()[boost::beast::http::field::expect], "100-continue"))
{
boost::beast::http::response<boost::beast::http::empty_body> continue_response;
continue_response.version (11);
continue_response.result (boost::beast::http::status::continue_);
continue_response.set (boost::beast::http::field::server, "nano");
auto response_size (boost::beast::http::async_write (this_l->socket, continue_response, boost::asio::use_future));
header_response_bytes_written = response_size.get ();
}
}
else
{
header_error = ec;
this_l->node->logger.always_log ("RPC header error: ", ec.message ());
}
header_available_promise.set_value (header_response_bytes_written);
});
// Avait header
header_available.get ();
}
if (!header_error)
{
auto body_parser (std::make_shared<boost::beast::http::request_parser<boost::beast::http::string_body>> (std::move (*header_parser)));
boost::beast::http::async_read (socket, buffer, *body_parser, [this_l, body_parser](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
{
this_l->node->background ([this_l, body_parser]() {
auto & req (body_parser->get ());
auto start (std::chrono::steady_clock::now ());
auto version (req.version ());
std::string request_id (boost::str (boost::format ("%1%") % boost::io::group (std::hex, std::showbase, reinterpret_cast<uintptr_t> (this_l.get ()))));
auto response_handler ([this_l, version, start, request_id](boost::property_tree::ptree const & tree_a) {
std::stringstream ostream;
boost::property_tree::write_json (ostream, tree_a);
ostream.flush ();
auto body (ostream.str ());
this_l->write_result (body, version);
boost::beast::http::async_write (this_l->socket, this_l->res, [this_l](boost::system::error_code const & ec, size_t bytes_transferred) {
this_l->write_completion_handler (this_l);
});
if (this_l->node->config.logging.log_rpc ())
{
this_l->node->logger.always_log (boost::str (boost::format ("RPC request %2% completed in: %1% microseconds") % std::chrono::duration_cast<std::chrono::microseconds> (std::chrono::steady_clock::now () - start).count () % request_id));
}
});
auto method = req.method ();
switch (method)
{
case boost::beast::http::verb::post:
{
auto handler (std::make_shared<nano::rpc_handler> (*this_l->node, this_l->rpc, req.body (), request_id, response_handler));
handler->process_request ();
break;
}
case boost::beast::http::verb::options:
{
this_l->prepare_head (version);
this_l->res.prepare_payload ();
boost::beast::http::async_write (this_l->socket, this_l->res, [this_l](boost::system::error_code const & ec, size_t bytes_transferred) {
this_l->write_completion_handler (this_l);
});
break;
}
default:
{
error_response (response_handler, "Can only POST requests");
break;
}
}
});
}
else
{
this_l->node->logger.always_log ("RPC read error: ", ec.message ());
}
});
}
else
{
// Respond with the reason for the invalid header
auto response_handler ([this_l](boost::property_tree::ptree const & tree_a) {
std::stringstream ostream;
boost::property_tree::write_json (ostream, tree_a);
ostream.flush ();
auto body (ostream.str ());
this_l->write_result (body, 11);
boost::beast::http::async_write (this_l->socket, this_l->res, [this_l](boost::system::error_code const & ec, size_t bytes_transferred) {
this_l->write_completion_handler (this_l);
});
});
error_response (response_handler, std::string ("Invalid header: ") + header_error.message ());
}
}
void nano::rpc_connection::write_completion_handler (std::shared_ptr<nano::rpc_connection> rpc_connection)
{
// Intentional no-op
}
namespace
{
std::string filter_request (boost::property_tree::ptree tree_a)

24
nano/rpc/CMakeLists.txt Normal file
View file

@ -0,0 +1,24 @@
if (NANO_SECURE_RPC OR RAIBLOCKS_SECURE_RPC)
set (secure_rpc_sources rpc_secure.hpp rpc_secure.cpp rpc_connection_secure.hpp rpc_connection_secure.cpp)
endif ()
add_library (rpc
${secure_rpc_sources}
rpc.hpp
rpc.cpp
rpc_connection.hpp
rpc_connection.cpp
rpc_handler.hpp
rpc_handler.cpp)
target_link_libraries (rpc
node
nano_lib
${OPENSSL_LIBRARIES}
Boost::boost)
target_compile_definitions(rpc
PRIVATE
-DNANO_VERSION_MAJOR=${CPACK_PACKAGE_VERSION_MAJOR}
-DNANO_VERSION_MINOR=${CPACK_PACKAGE_VERSION_MINOR}
-DNANO_VERSION_PATCH=${CPACK_PACKAGE_VERSION_PATCH})

184
nano/rpc/rpc.cpp Normal file
View file

@ -0,0 +1,184 @@
#include <boost/algorithm/string.hpp>
#include <nano/lib/config.hpp>
#include <nano/lib/interface.h>
#include <nano/node/node.hpp>
#include <nano/rpc/rpc.hpp>
#include <nano/rpc/rpc_connection.hpp>
#include <nano/rpc/rpc_handler.hpp>
#ifdef NANO_SECURE_RPC
#include <nano/rpc/rpc_secure.hpp>
#endif
#include <nano/lib/errors.hpp>
nano::rpc::rpc (boost::asio::io_context & io_ctx_a, nano::node & node_a, nano::rpc_config const & config_a) :
acceptor (io_ctx_a),
config (config_a),
node (node_a)
{
}
void nano::rpc::add_block_observer ()
{
node.observers.blocks.add ([this](std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::uint128_t const &, bool) {
observer_action (account_a);
});
}
void nano::rpc::start (bool rpc_enabled_a)
{
if (rpc_enabled_a)
{
auto endpoint (nano::tcp_endpoint (config.address, config.port));
acceptor.open (endpoint.protocol ());
acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true));
boost::system::error_code ec;
acceptor.bind (endpoint, ec);
if (ec)
{
node.logger.always_log (boost::str (boost::format ("Error while binding for RPC on port %1%: %2%") % endpoint.port () % ec.message ()));
throw std::runtime_error (ec.message ());
}
acceptor.listen ();
}
add_block_observer ();
if (rpc_enabled_a)
{
accept ();
}
}
void nano::rpc::accept ()
{
auto connection (std::make_shared<nano::rpc_connection> (node, *this));
acceptor.async_accept (connection->socket, [this, connection](boost::system::error_code const & ec) {
if (ec != boost::asio::error::operation_aborted && acceptor.is_open ())
{
accept ();
}
if (!ec)
{
connection->parse_connection ();
}
else
{
this->node.logger.always_log (boost::str (boost::format ("Error accepting RPC connections: %1% (%2%)") % ec.message () % ec.value ()));
}
});
}
void nano::rpc::stop ()
{
acceptor.close ();
}
void nano::rpc::observer_action (nano::account const & account_a)
{
std::shared_ptr<nano::payment_observer> observer;
{
std::lock_guard<std::mutex> lock (mutex);
auto existing (payment_observers.find (account_a));
if (existing != payment_observers.end ())
{
observer = existing->second;
}
}
if (observer != nullptr)
{
observer->observe ();
}
}
nano::payment_observer::payment_observer (std::function<void(boost::property_tree::ptree const &)> const & response_a, nano::rpc & rpc_a, nano::account const & account_a, nano::amount const & amount_a) :
rpc (rpc_a),
account (account_a),
amount (amount_a),
response (response_a)
{
completed.clear ();
}
void nano::payment_observer::start (uint64_t timeout)
{
auto this_l (shared_from_this ());
rpc.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (timeout), [this_l]() {
this_l->complete (nano::payment_status::nothing);
});
}
nano::payment_observer::~payment_observer ()
{
}
void nano::payment_observer::observe ()
{
if (rpc.node.balance (account) >= amount.number ())
{
complete (nano::payment_status::success);
}
}
void nano::payment_observer::complete (nano::payment_status status)
{
auto already (completed.test_and_set ());
if (!already)
{
if (rpc.node.config.logging.log_rpc ())
{
rpc.node.logger.always_log (boost::str (boost::format ("Exiting payment_observer for account %1% status %2%") % account.to_account () % static_cast<unsigned> (status)));
}
switch (status)
{
case nano::payment_status::nothing:
{
boost::property_tree::ptree response_l;
response_l.put ("deprecated", "1");
response_l.put ("status", "nothing");
response (response_l);
break;
}
case nano::payment_status::success:
{
boost::property_tree::ptree response_l;
response_l.put ("deprecated", "1");
response_l.put ("status", "success");
response (response_l);
break;
}
default:
{
error_response (response, "Internal payment error");
break;
}
}
std::lock_guard<std::mutex> lock (rpc.mutex);
assert (rpc.payment_observers.find (account) != rpc.payment_observers.end ());
rpc.payment_observers.erase (account);
}
}
std::unique_ptr<nano::rpc> nano::get_rpc (boost::asio::io_context & io_ctx_a, nano::node & node_a, nano::rpc_config const & config_a)
{
std::unique_ptr<rpc> impl;
if (config_a.secure.enable)
{
#ifdef NANO_SECURE_RPC
impl.reset (new rpc_secure (io_ctx_a, node_a, config_a));
#else
std::cerr << "RPC configured for TLS, but the node is not compiled with TLS support" << std::endl;
#endif
}
else
{
impl.reset (new rpc (io_ctx_a, node_a, config_a));
}
return impl;
}

74
nano/rpc/rpc.hpp Normal file
View file

@ -0,0 +1,74 @@
#pragma once
#include <atomic>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <nano/lib/blocks.hpp>
#include <nano/lib/config.hpp>
#include <nano/lib/errors.hpp>
#include <nano/lib/jsonconfig.hpp>
#include <nano/lib/rpcconfig.hpp>
#include <nano/secure/blockstore.hpp>
#include <nano/secure/utility.hpp>
#include <unordered_map>
namespace nano
{
class node;
enum class payment_status
{
not_a_status,
unknown,
nothing, // Timeout and nothing was received
//insufficient, // Timeout and not enough was received
//over, // More than requested received
//success_fork, // Amount received but it involved a fork
success // Amount received
};
class wallet;
class payment_observer;
class rpc
{
public:
rpc (boost::asio::io_context &, nano::node &, nano::rpc_config const &);
virtual ~rpc () = default;
/**
* Start serving RPC requests if \p rpc_enabled_a, otherwise this will only
* add a block observer since requests may still arrive via IPC.
*/
void start (bool rpc_enabled_a = true);
void add_block_observer ();
virtual void accept ();
void stop ();
void observer_action (nano::account const &);
boost::asio::ip::tcp::acceptor acceptor;
std::mutex mutex;
std::unordered_map<nano::account, std::shared_ptr<nano::payment_observer>> payment_observers;
nano::rpc_config config;
nano::node & node;
bool on;
};
class payment_observer : public std::enable_shared_from_this<nano::payment_observer>
{
public:
payment_observer (std::function<void(boost::property_tree::ptree const &)> const &, nano::rpc &, nano::account const &, nano::amount const &);
~payment_observer ();
void start (uint64_t);
void observe ();
void complete (nano::payment_status);
std::mutex mutex;
std::condition_variable condition;
nano::rpc & rpc;
nano::account account;
nano::amount amount;
std::function<void(boost::property_tree::ptree const &)> response;
std::atomic_flag completed;
};
/** Returns the correct RPC implementation based on TLS configuration */
std::unique_ptr<nano::rpc> get_rpc (boost::asio::io_context & io_ctx_a, nano::node & node_a, nano::rpc_config const & config_a);
}

163
nano/rpc/rpc_connection.cpp Normal file
View file

@ -0,0 +1,163 @@
#include <boost/algorithm/string/predicate.hpp>
#include <boost/format.hpp>
#include <nano/lib/config.hpp>
#include <nano/rpc/rpc.hpp>
#include <nano/rpc/rpc_connection.hpp>
#include <nano/rpc/rpc_handler.hpp>
nano::rpc_connection::rpc_connection (nano::node & node_a, nano::rpc & rpc_a) :
node (node_a.shared ()),
rpc (rpc_a),
socket (node_a.io_ctx)
{
responded.clear ();
}
void nano::rpc_connection::parse_connection ()
{
read ();
}
void nano::rpc_connection::prepare_head (unsigned version, boost::beast::http::status status)
{
res.version (version);
res.result (status);
res.set (boost::beast::http::field::allow, "POST, OPTIONS");
res.set (boost::beast::http::field::content_type, "application/json");
res.set (boost::beast::http::field::access_control_allow_origin, "*");
res.set (boost::beast::http::field::access_control_allow_methods, "POST, OPTIONS");
res.set (boost::beast::http::field::access_control_allow_headers, "Accept, Accept-Language, Content-Language, Content-Type");
res.set (boost::beast::http::field::connection, "close");
}
void nano::rpc_connection::write_result (std::string body, unsigned version, boost::beast::http::status status)
{
if (!responded.test_and_set ())
{
prepare_head (version, status);
res.body () = body;
res.prepare_payload ();
}
else
{
assert (false && "RPC already responded and should only respond once");
// Guards `res' from being clobbered while async_write is being serviced
}
}
void nano::rpc_connection::read ()
{
auto this_l (shared_from_this ());
boost::system::error_code header_error;
auto header_parser (std::make_shared<boost::beast::http::request_parser<boost::beast::http::empty_body>> ());
std::promise<size_t> header_available_promise;
std::future<size_t> header_available = header_available_promise.get_future ();
header_parser->body_limit (rpc.config.max_request_size);
if (!node->network_params.network.is_test_network ())
{
boost::beast::http::async_read_header (socket, buffer, *header_parser, [this_l, header_parser, &header_available_promise, &header_error](boost::system::error_code const & ec, size_t bytes_transferred) {
size_t header_response_bytes_written = 0;
if (!ec)
{
if (boost::iequals (header_parser->get ()[boost::beast::http::field::expect], "100-continue"))
{
boost::beast::http::response<boost::beast::http::empty_body> continue_response;
continue_response.version (11);
continue_response.result (boost::beast::http::status::continue_);
continue_response.set (boost::beast::http::field::server, "nano");
auto response_size (boost::beast::http::async_write (this_l->socket, continue_response, boost::asio::use_future));
header_response_bytes_written = response_size.get ();
}
}
else
{
header_error = ec;
this_l->node->logger.always_log ("RPC header error: ", ec.message ());
}
header_available_promise.set_value (header_response_bytes_written);
});
// Avait header
header_available.get ();
}
if (!header_error)
{
auto body_parser (std::make_shared<boost::beast::http::request_parser<boost::beast::http::string_body>> (std::move (*header_parser)));
boost::beast::http::async_read (socket, buffer, *body_parser, [this_l, body_parser](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
{
this_l->node->background ([this_l, body_parser]() {
auto & req (body_parser->get ());
auto start (std::chrono::steady_clock::now ());
auto version (req.version ());
std::string request_id (boost::str (boost::format ("%1%") % boost::io::group (std::hex, std::showbase, reinterpret_cast<uintptr_t> (this_l.get ()))));
auto response_handler ([this_l, version, start, request_id](boost::property_tree::ptree const & tree_a) {
std::stringstream ostream;
boost::property_tree::write_json (ostream, tree_a);
ostream.flush ();
auto body (ostream.str ());
this_l->write_result (body, version);
boost::beast::http::async_write (this_l->socket, this_l->res, [this_l](boost::system::error_code const & ec, size_t bytes_transferred) {
this_l->write_completion_handler (this_l);
});
if (this_l->node->config.logging.log_rpc ())
{
this_l->node->logger.always_log (boost::str (boost::format ("RPC request %2% completed in: %1% microseconds") % std::chrono::duration_cast<std::chrono::microseconds> (std::chrono::steady_clock::now () - start).count () % request_id));
}
});
auto method = req.method ();
switch (method)
{
case boost::beast::http::verb::post:
{
auto handler (std::make_shared<nano::rpc_handler> (*this_l->node, this_l->rpc, req.body (), request_id, response_handler));
handler->process_request ();
break;
}
case boost::beast::http::verb::options:
{
this_l->prepare_head (version);
this_l->res.prepare_payload ();
boost::beast::http::async_write (this_l->socket, this_l->res, [this_l](boost::system::error_code const & ec, size_t bytes_transferred) {
this_l->write_completion_handler (this_l);
});
break;
}
default:
{
error_response (response_handler, "Can only POST requests");
break;
}
}
});
}
else
{
this_l->node->logger.always_log ("RPC read error: ", ec.message ());
}
});
}
else
{
// Respond with the reason for the invalid header
auto response_handler ([this_l](boost::property_tree::ptree const & tree_a) {
std::stringstream ostream;
boost::property_tree::write_json (ostream, tree_a);
ostream.flush ();
auto body (ostream.str ());
this_l->write_result (body, 11);
boost::beast::http::async_write (this_l->socket, this_l->res, [this_l](boost::system::error_code const & ec, size_t bytes_transferred) {
this_l->write_completion_handler (this_l);
});
});
error_response (response_handler, std::string ("Invalid header: ") + header_error.message ());
}
}
void nano::rpc_connection::write_completion_handler (std::shared_ptr<nano::rpc_connection> rpc_connection)
{
// Intentional no-op
}

View file

@ -0,0 +1,27 @@
#pragma once
#include <atomic>
#include <boost/beast.hpp>
#include <nano/node/node.hpp>
namespace nano
{
class rpc;
class rpc_connection : public std::enable_shared_from_this<nano::rpc_connection>
{
public:
rpc_connection (nano::node &, nano::rpc &);
virtual ~rpc_connection () = default;
virtual void parse_connection ();
virtual void write_completion_handler (std::shared_ptr<nano::rpc_connection> rpc_connection);
virtual void prepare_head (unsigned version, boost::beast::http::status status = boost::beast::http::status::ok);
virtual void write_result (std::string body, unsigned version, boost::beast::http::status status = boost::beast::http::status::ok);
void read ();
std::shared_ptr<nano::node> node;
nano::rpc & rpc;
boost::asio::ip::tcp::socket socket;
boost::beast::flat_buffer buffer;
boost::beast::http::response<boost::beast::http::string_body> res;
std::atomic_flag responded;
};
}

View file

@ -0,0 +1,46 @@
#include <nano/rpc/rpc_connection_secure.hpp>
#include <nano/rpc/rpc_secure.hpp>
#include <boost/polymorphic_pointer_cast.hpp>
nano::rpc_connection_secure::rpc_connection_secure (nano::node & node_a, nano::rpc_secure & rpc_a) :
nano::rpc_connection (node_a, rpc_a),
stream (socket, rpc_a.ssl_context)
{
}
void nano::rpc_connection_secure::parse_connection ()
{
// Perform the SSL handshake
auto this_l = std::static_pointer_cast<nano::rpc_connection_secure> (shared_from_this ());
stream.async_handshake (boost::asio::ssl::stream_base::server,
[this_l](auto & ec) {
this_l->handle_handshake (ec);
});
}
void nano::rpc_connection_secure::on_shutdown (const boost::system::error_code & error)
{
// No-op. We initiate the shutdown (since the RPC server kills the connection after each request)
// and we'll thus get an expected EOF error. If the client disconnects, a short-read error will be expected.
}
void nano::rpc_connection_secure::handle_handshake (const boost::system::error_code & error)
{
if (!error)
{
read ();
}
else
{
node->logger.always_log ("TLS: Handshake error: ", error.message ());
}
}
void nano::rpc_connection_secure::write_completion_handler (std::shared_ptr<nano::rpc_connection> rpc)
{
auto rpc_connection_secure = boost::polymorphic_pointer_downcast<nano::rpc_connection_secure> (rpc);
rpc_connection_secure->stream.async_shutdown ([rpc_connection_secure](auto const & ec_shutdown) {
rpc_connection_secure->on_shutdown (ec_shutdown);
});
}

View file

@ -0,0 +1,27 @@
#pragma once
#include <boost/asio/ssl/stream.hpp>
#include <nano/rpc/rpc_connection.hpp>
namespace nano
{
class rpc_secure;
/**
* Specialization of nano::rpc_connection for establishing TLS connections.
* Handshakes with client certificates are supported.
*/
class rpc_connection_secure : public rpc_connection
{
public:
rpc_connection_secure (nano::node &, nano::rpc_secure &);
void parse_connection () override;
void write_completion_handler (std::shared_ptr<nano::rpc_connection> rpc) override;
/** The TLS handshake callback */
void handle_handshake (const boost::system::error_code & error);
/** The TLS async shutdown callback */
void on_shutdown (const boost::system::error_code & error);
private:
boost::asio::ssl::stream<boost::asio::ip::tcp::socket &> stream;
};
}

4620
nano/rpc/rpc_handler.cpp Normal file

File diff suppressed because it is too large Load diff

View file

@ -1,90 +1,17 @@
#pragma once
#include <atomic>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <functional>
#include <nano/lib/blocks.hpp>
#include <nano/lib/config.hpp>
#include <nano/lib/errors.hpp>
#include <nano/lib/jsonconfig.hpp>
#include <nano/node/rpcconfig.hpp>
#include <nano/secure/blockstore.hpp>
#include <nano/secure/utility.hpp>
#include <unordered_map>
#include <nano/node/wallet.hpp>
#include <string>
namespace nano
{
void error_response (std::function<void(boost::property_tree::ptree const &)> response_a, std::string const & message_a);
class node;
enum class payment_status
{
not_a_status,
unknown,
nothing, // Timeout and nothing was received
//insufficient, // Timeout and not enough was received
//over, // More than requested received
//success_fork, // Amount received but it involved a fork
success // Amount received
};
class wallet;
class payment_observer;
class rpc
{
public:
rpc (boost::asio::io_context &, nano::node &, nano::rpc_config const &);
virtual ~rpc () = default;
class rpc;
void error_response (std::function<void(boost::property_tree::ptree const &)> response_a, std::string const & message_a);
/**
* Start serving RPC requests if \p rpc_enabled_a, otherwise this will only
* add a block observer since requests may still arrive via IPC.
*/
void start (bool rpc_enabled_a = true);
void add_block_observer ();
virtual void accept ();
void stop ();
void observer_action (nano::account const &);
boost::asio::ip::tcp::acceptor acceptor;
std::mutex mutex;
std::unordered_map<nano::account, std::shared_ptr<nano::payment_observer>> payment_observers;
nano::rpc_config config;
nano::node & node;
bool on;
};
class rpc_connection : public std::enable_shared_from_this<nano::rpc_connection>
{
public:
rpc_connection (nano::node &, nano::rpc &);
virtual ~rpc_connection () = default;
virtual void parse_connection ();
virtual void write_completion_handler (std::shared_ptr<nano::rpc_connection> rpc_connection);
virtual void prepare_head (unsigned version, boost::beast::http::status status = boost::beast::http::status::ok);
virtual void write_result (std::string body, unsigned version, boost::beast::http::status status = boost::beast::http::status::ok);
void read ();
std::shared_ptr<nano::node> node;
nano::rpc & rpc;
boost::asio::ip::tcp::socket socket;
boost::beast::flat_buffer buffer;
boost::beast::http::response<boost::beast::http::string_body> res;
std::atomic_flag responded;
};
class payment_observer : public std::enable_shared_from_this<nano::payment_observer>
{
public:
payment_observer (std::function<void(boost::property_tree::ptree const &)> const &, nano::rpc &, nano::account const &, nano::amount const &);
~payment_observer ();
void start (uint64_t);
void observe ();
void complete (nano::payment_status);
std::mutex mutex;
std::condition_variable condition;
nano::rpc & rpc;
nano::account account;
nano::amount amount;
std::function<void(boost::property_tree::ptree const &)> response;
std::atomic_flag completed;
};
class rpc_handler : public std::enable_shared_from_this<nano::rpc_handler>
{
public:
@ -222,6 +149,4 @@ public:
uint64_t offset_optional_impl (uint64_t = 0);
bool rpc_control_impl ();
};
/** Returns the correct RPC implementation based on TLS configuration */
std::unique_ptr<nano::rpc> get_rpc (boost::asio::io_context & io_ctx_a, nano::node & node_a, nano::rpc_config const & config_a);
}

View file

@ -1,6 +1,6 @@
#include <boost/polymorphic_pointer_cast.hpp>
#include <nano/node/node.hpp>
#include <nano/node/rpc_secure.hpp>
#include <nano/rpc/rpc_connection_secure.hpp>
#include <nano/rpc/rpc_secure.hpp>
bool nano::rpc_secure::on_verify_certificate (bool preverified, boost::asio::ssl::verify_context & ctx)
{
@ -114,45 +114,3 @@ void nano::rpc_secure::accept ()
}
});
}
nano::rpc_connection_secure::rpc_connection_secure (nano::node & node_a, nano::rpc_secure & rpc_a) :
nano::rpc_connection (node_a, rpc_a),
stream (socket, rpc_a.ssl_context)
{
}
void nano::rpc_connection_secure::parse_connection ()
{
// Perform the SSL handshake
auto this_l = std::static_pointer_cast<nano::rpc_connection_secure> (shared_from_this ());
stream.async_handshake (boost::asio::ssl::stream_base::server,
[this_l](auto & ec) {
this_l->handle_handshake (ec);
});
}
void nano::rpc_connection_secure::on_shutdown (const boost::system::error_code & error)
{
// No-op. We initiate the shutdown (since the RPC server kills the connection after each request)
// and we'll thus get an expected EOF error. If the client disconnects, a short-read error will be expected.
}
void nano::rpc_connection_secure::handle_handshake (const boost::system::error_code & error)
{
if (!error)
{
read ();
}
else
{
node->logger.always_log ("TLS: Handshake error: ", error.message ());
}
}
void nano::rpc_connection_secure::write_completion_handler (std::shared_ptr<nano::rpc_connection> rpc)
{
auto rpc_connection_secure = boost::polymorphic_pointer_downcast<nano::rpc_connection_secure> (rpc);
rpc_connection_secure->stream.async_shutdown ([rpc_connection_secure](auto const & ec_shutdown) {
rpc_connection_secure->on_shutdown (ec_shutdown);
});
}

View file

@ -1,7 +1,7 @@
#pragma once
#include <boost/asio/ssl/context.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <nano/node/rpc.hpp>
#include <nano/rpc/rpc.hpp>
namespace nano
{
@ -28,23 +28,4 @@ public:
/** The context needs to be shared between sessions to make resumption work */
boost::asio::ssl::context ssl_context;
};
/**
* Specialization of nano::rpc_connection for establishing TLS connections.
* Handshakes with client certificates are supported.
*/
class rpc_connection_secure : public rpc_connection
{
public:
rpc_connection_secure (nano::node &, nano::rpc_secure &);
void parse_connection () override;
void write_completion_handler (std::shared_ptr<nano::rpc_connection> rpc) override;
/** The TLS handshake callback */
void handle_handshake (const boost::system::error_code & error);
/** The TLS async shutdown callback */
void on_shutdown (const boost::system::error_code & error);
private:
boost::asio::ssl::stream<boost::asio::ip::tcp::socket &> stream;
};
}

View file

@ -0,0 +1,13 @@
add_executable (rpc_test
entry.cpp
rpc.cpp)
target_link_libraries(rpc_test gtest gtest_main rpc)
target_compile_definitions(rpc_test
PUBLIC
-DACTIVE_NETWORK=${ACTIVE_NETWORK}
PRIVATE
-DNANO_VERSION_MAJOR=${CPACK_PACKAGE_VERSION_MAJOR}
-DNANO_VERSION_MINOR=${CPACK_PACKAGE_VERSION_MINOR}
-DNANO_VERSION_PATCH=${CPACK_PACKAGE_VERSION_PATCH})

13
nano/rpc_test/entry.cpp Normal file
View file

@ -0,0 +1,13 @@
#include <gtest/gtest.h>
namespace nano
{
void force_nano_test_network ();
}
int main (int argc, char ** argv)
{
nano::force_nano_test_network ();
testing::InitGoogleTest (&argc, argv);
return RUN_ALL_TESTS ();
}

5123
nano/rpc_test/rpc.cpp Normal file

File diff suppressed because it is too large Load diff