diff --git a/.gitignore b/.gitignore index d7e79709..4500baf3 100644 --- a/.gitignore +++ b/.gitignore @@ -63,6 +63,9 @@ qrc_resources.cpp qt_system resources.qrc.depends +# Autogenerated Flatbuffers source files +nano/ipc_flatbuffers_lib/generated/flatbuffers/nanoapi_generated.h + # CMake artifacts _CPack_Packages CPack* diff --git a/.gitmodules b/.gitmodules index 5d31006c..20b18827 100644 --- a/.gitmodules +++ b/.gitmodules @@ -21,3 +21,6 @@ [submodule "nano-pow-server"] path = nano-pow-server url = https://github.com/nanocurrency/nano-pow-server.git +[submodule "flatbuffers"] + path = flatbuffers + url = https://github.com/google/flatbuffers.git diff --git a/CMakeLists.txt b/CMakeLists.txt index ce27035e..0318dbd6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -235,6 +235,9 @@ endif () include_directories(cpptoml/include) add_subdirectory(crypto/ed25519-donna) +add_subdirectory(nano/ipc_flatbuffers_lib) +add_subdirectory(nano/ipc_flatbuffers_test) + set (UPNPC_BUILD_SHARED OFF CACHE BOOL "") add_subdirectory (miniupnp/miniupnpc EXCLUDE_FROM_ALL) # FIXME: This fixes miniupnpc include directories without modifying miniupnpc's diff --git a/api/flatbuffers/nanoapi.fbs b/api/flatbuffers/nanoapi.fbs new file mode 100755 index 00000000..baf776f1 --- /dev/null +++ b/api/flatbuffers/nanoapi.fbs @@ -0,0 +1,256 @@ +/* + Flatbuffer schema for the Nano IPC API. + Please see https://google.github.io/flatbuffers/md__schemas.html for recommended schema evolution practices. + */ +namespace nanoapi; + +/** Returns the voting weight for the given account */ +table AccountWeight { + /** A nano_ address */ + account: string (required); +} + +/** Response to AccountWeight */ +table AccountWeightResponse { + /** Voting weight as a decimal number*/ + voting_weight: string (required); +} + +/** + * Block subtype for state blocks. + * Note that the node makes no distinction between open and receive subtypes. + */ +enum BlockSubType: byte { + invalid = 0, + receive, + send, + change, + epoch +} + +/** Block union */ +union Block { + BlockState, + BlockOpen, + BlockReceive, + BlockSend, + BlockChange +} + +table BlockOpen { + /** Hash of this block */ + hash: string; + /** Account being opened */ + account: string; + /** Hash of send block */ + source: string; + /** Representative address */ + representative: string; + /** Signature as a hex string */ + signature: string; + /** Work is a hex string representing work. This is a string as the work value may not fit in native numeric types. */ + work: string; +} + +table BlockReceive { + /** Hash of this block */ + hash: string; + /** Hash of previous block */ + previous: string; + /** Source hash */ + source: string; + /** Signature as a hex string */ + signature: string; + /** Work is a hex string representing work. This is a string as the work value may not fit in native numeric types. */ + work: string; +} + +table BlockSend { + /** Hash of this block */ + hash: string; + /** Hash of previous block */ + previous: string; + /** Destination account */ + destination: string; + /** Balance in raw */ + balance: string; + /** Signature as a hex string */ + signature: string; + /** Work is a hex string representing work. This is a string as the work value may not fit in native numeric types. */ + work: string; +} + +table BlockChange { + /** Hash of this block */ + hash: string; + /** Hash of previous block */ + previous: string; + /** Representative address */ + representative: string; + /** Signature as a hex string */ + signature: string; + /** Work is a hex string representing work. This is a string as the work value may not fit in native numeric types. */ + work: string; +} + +table BlockState { + /** Hash of this block */ + hash: string; + /** Account as nano_ string */ + account: string; + /** Hash of previous block */ + previous: string; + /** Representative as nano_ string */ + representative: string; + /** Balance in raw */ + balance: string; + /** Link as a hex string */ + link: string; + /** Link interpreted as a nano_ address */ + link_as_account: string; + /** Signature as a hex string */ + signature: string; + /** Work is a hex string representing work. This is a string as the work value may not fit in native numeric types. */ + work: string; + /** Subtype of this state block */ + subtype: BlockSubType; +} + +/** Information about a block */ +table BlockInfo { + block: Block; +} + +/** Called by a service (usually an external process) to register itself */ +table ServiceRegister { + service_name: string; +} + +/** Request the node to send an EventServiceStop event to the given service */ +table ServiceStop { + /** Name of service to stop. */ + service_name: string (required); + /** If true, restart the service */ + restart: bool = false; +} + +/** + * Subscribe or unsubscribe to EventServiceStop events. + * The service must first have registered itself on the same session. + */ +table TopicServiceStop { + /** Set to true to unsubscribe */ + unsubscribe: bool; +} + +/** Sent to a service to request it to stop itself */ +table EventServiceStop { +} + +/** + * All subscriptions are acknowledged. Use the envelope's correlation id + * if you need to match the ack with the subscription. + */ +table EventAck { +} + +/** Requested confirmation type */ +enum TopicConfirmationTypeFilter : byte { all, active, active_quorum, active_confirmation_height, inactive } + +/** Type of block confirmation */ +enum TopicConfirmationType : byte { active_quorum, active_confirmation_height, inactive } + +/** Subscribe or unsubscribe to block confirmations of type EventConfirmation */ +table TopicConfirmation { + /** Set to true to unsubscribe */ + unsubscribe: bool; + options: TopicConfirmationOptions; +} + +table TopicConfirmationOptions { + confirmation_type_filter: TopicConfirmationTypeFilter = all; + all_local_accounts: bool; + accounts: [string]; + include_block: bool = true; + include_election_info: bool = true; +} + +/** Notification of block confirmation. */ +table EventConfirmation { + confirmation_type: TopicConfirmationType; + account: string; + amount: string; + hash: string; + block: Block; + election_info: ElectionInfo; +} + +table ElectionInfo { + duration: uint64; + time: uint64; + tally: string; + request_count: uint64; + block_count: uint64; + voter_count: uint64; +} + +/** Error response. All fields are optional */ +table Error { + /** Error code. May be negative or positive. */ + code: int; + /** Error category code */ + category: int; + /** Error message */ + message: string; +} + +/** + * A general purpose success response for messages that don't return a message. + * The response includes an optional message text. + */ +table Success { + message: string; +} + +/** IsAlive request and response. Any node issues will be reported through an error in the envelope. */ +table IsAlive { +} + +/** + * A union is the idiomatic way in Flatbuffers to transmit messages of multiple types. + * All top-level message types (including response types) must be listed here. + * @warning To ensure compatibility, only append and deprecate message types. + */ +union Message { + Error, + Success, + IsAlive, + EventAck, + BlockInfo, + AccountWeight, + AccountWeightResponse, + TopicConfirmation, + EventConfirmation, + ServiceRegister, + ServiceStop, + TopicServiceStop, + EventServiceStop +} + +/** + * All messages are wrapped in an envelope which contains information such as + * message type, credentials and correlation id. For responses, the message may be an Error. + */ +table Envelope { + /** Milliseconds since epoch when the message was sent. */ + time: uint64; + /** An optional and arbitrary string used for authentication. The corresponding http header for api keys is "nano-api-key" */ + credentials: string; + /** Correlation id is an optional and arbitrary string. The corresponding http header is "nano-correlation-id" */ + correlation_id: string; + /** The contained message. A 'message_type' property will be automatically added to JSON messages. */ + message: Message; +} + +/** The Envelope is the type marshalled over IPC, and also serves as the top-level JSON type */ +root_type Envelope; diff --git a/docker/node/Dockerfile b/docker/node/Dockerfile index 0248c2d0..ef264309 100644 --- a/docker/node/Dockerfile +++ b/docker/node/Dockerfile @@ -24,6 +24,7 @@ RUN groupadd --gid 1000 nanocurrency && \ COPY --from=0 /tmp/build/nano_node /usr/bin COPY --from=0 /tmp/build/nano_rpc /usr/bin COPY --from=0 /tmp/build/nano_pow_server /usr/bin +COPY --from=0 /tmp/src/api/ /usr/bin/api/ COPY --from=0 /etc/nano-network /etc COPY docker/node/entry.sh /usr/bin/entry.sh COPY docker/node/config /usr/share/nano/config diff --git a/flatbuffers b/flatbuffers new file mode 160000 index 00000000..3b458f7a --- /dev/null +++ b/flatbuffers @@ -0,0 +1 @@ +Subproject commit 3b458f7a170154ed4c4a3a2a9f6116fb2d415ad5 diff --git a/nano/core_test/ipc.cpp b/nano/core_test/ipc.cpp index 6a161ef5..f789fc9f 100644 --- a/nano/core_test/ipc.cpp +++ b/nano/core_test/ipc.cpp @@ -1,6 +1,8 @@ #include #include -#include +#include +#include +#include #include #include @@ -24,7 +26,7 @@ TEST (ipc, asynchronous) nano::ipc::ipc_server ipc (*system.nodes[0], node_rpc_config); nano::ipc::ipc_client client (system.nodes[0]->io_ctx); - auto req (nano::ipc::prepare_request (nano::ipc::payload_encoding::json_legacy, std::string (R"({"action": "block_count"})"))); + auto req (nano::ipc::prepare_request (nano::ipc::payload_encoding::json_v1, std::string (R"({"action": "block_count"})"))); auto res (std::make_shared> ()); std::atomic call_completed{ false }; client.async_connect ("::1", 24077, [&client, &req, &res, &call_completed](nano::error err) { @@ -56,6 +58,7 @@ TEST (ipc, asynchronous) { ASSERT_NO_ERROR (system.poll ()); } + ipc.stop (); } TEST (ipc, synchronous) @@ -71,7 +74,7 @@ TEST (ipc, synchronous) std::atomic call_completed{ false }; std::thread client_thread ([&client, &call_completed]() { client.connect ("::1", 24077); - std::string response (nano::ipc::request (client, std::string (R"({"action": "block_count"})"))); + std::string response (nano::ipc::request (nano::ipc::payload_encoding::json_v1, client, std::string (R"({"action": "block_count"})"))); std::stringstream ss; ss << response; // Make sure the response is valid json @@ -88,6 +91,7 @@ TEST (ipc, synchronous) { ASSERT_NO_ERROR (system.poll ()); } + ipc.stop (); } TEST (ipc, config_upgrade_v0_v1) @@ -108,3 +112,107 @@ TEST (ipc, config_upgrade_v0_v1) ASSERT_LE (1, local2.get ("version")); ASSERT_FALSE (local2.get ("allow_unsafe")); } + +TEST (ipc, permissions_default_user) +{ + // Test empty/nonexistant access config. The default user still exists with default permissions. + std::stringstream ss; + ss << R"toml( + )toml"; + + nano::tomlconfig toml; + toml.read (ss); + + nano::ipc::access access; + access.deserialize_toml (toml); + ASSERT_TRUE (access.has_access ("", nano::ipc::access_permission::api_account_weight)); +} + +TEST (ipc, permissions_deny_default) +{ + // All users have api_account_weight permissions by default. This removes the permission for a specific user. + std::stringstream ss; + ss << R"toml( + [[user]] + id = "user1" + deny = "api_account_weight" + )toml"; + + nano::tomlconfig toml; + toml.read (ss); + + nano::ipc::access access; + access.deserialize_toml (toml); + ASSERT_FALSE (access.has_access ("user1", nano::ipc::access_permission::api_account_weight)); +} + +TEST (ipc, permissions_groups) +{ + // Make sure role permissions are adopted by user + std::stringstream ss; + ss << R"toml( + [[role]] + id = "mywalletadmin" + allow = "wallet_read, wallet_write" + + [[user]] + id = "user1" + roles = "mywalletadmin" + deny = "api_account_weight" + )toml"; + + nano::tomlconfig toml; + toml.read (ss); + + nano::ipc::access access; + access.deserialize_toml (toml); + ASSERT_FALSE (access.has_access ("user1", nano::ipc::access_permission::api_account_weight)); + ASSERT_TRUE (access.has_access_to_all ("user1", { nano::ipc::access_permission::wallet_read, nano::ipc::access_permission::wallet_write })); +} + +TEST (ipc, permissions_oneof) +{ + // Test one of two permissions + std::stringstream ss; + ss << R"toml( + [[user]] + id = "user1" + allow = "api_account_weight" + [[user]] + id = "user2" + allow = "api_account_weight, account_query" + [[user]] + id = "user3" + deny = "api_account_weight, account_query" + )toml"; + + nano::tomlconfig toml; + toml.read (ss); + + nano::ipc::access access; + access.deserialize_toml (toml); + ASSERT_TRUE (access.has_access ("user1", nano::ipc::access_permission::api_account_weight)); + ASSERT_TRUE (access.has_access ("user2", nano::ipc::access_permission::api_account_weight)); + ASSERT_FALSE (access.has_access ("user3", nano::ipc::access_permission::api_account_weight)); + ASSERT_TRUE (access.has_access_to_oneof ("user1", { nano::ipc::access_permission::account_query, nano::ipc::access_permission::api_account_weight })); + ASSERT_TRUE (access.has_access_to_oneof ("user2", { nano::ipc::access_permission::account_query, nano::ipc::access_permission::api_account_weight })); + ASSERT_FALSE (access.has_access_to_oneof ("user3", { nano::ipc::access_permission::account_query, nano::ipc::access_permission::api_account_weight })); +} + +TEST (ipc, permissions_default_user_order) +{ + // If changing the default user, it must come first + std::stringstream ss; + ss << R"toml( + [[user]] + id = "user1" + [[user]] + id = "" + )toml"; + + nano::tomlconfig toml; + toml.read (ss); + + nano::ipc::access access; + ASSERT_TRUE (access.deserialize_toml (toml)); +} diff --git a/nano/core_test/toml.cpp b/nano/core_test/toml.cpp index eba26404..76de566d 100644 --- a/nano/core_test/toml.cpp +++ b/nano/core_test/toml.cpp @@ -225,6 +225,8 @@ TEST (toml, daemon_config_deserialize_defaults) ASSERT_EQ (conf.node.ipc_config.transport_tcp.io_timeout, defaults.node.ipc_config.transport_tcp.io_timeout); ASSERT_EQ (conf.node.ipc_config.transport_tcp.io_threads, defaults.node.ipc_config.transport_tcp.io_threads); ASSERT_EQ (conf.node.ipc_config.transport_tcp.port, defaults.node.ipc_config.transport_tcp.port); + ASSERT_EQ (conf.node.ipc_config.flatbuffers.skip_unexpected_fields_in_json, defaults.node.ipc_config.flatbuffers.skip_unexpected_fields_in_json); + ASSERT_EQ (conf.node.ipc_config.flatbuffers.verify_buffers, defaults.node.ipc_config.flatbuffers.verify_buffers); ASSERT_EQ (conf.node.diagnostics_config.txn_tracking.enable, defaults.node.diagnostics_config.txn_tracking.enable); ASSERT_EQ (conf.node.diagnostics_config.txn_tracking.ignore_writes_below_block_processor_max_time, defaults.node.diagnostics_config.txn_tracking.ignore_writes_below_block_processor_max_time); @@ -440,6 +442,10 @@ TEST (toml, daemon_config_deserialize_no_defaults) io_threads = 999 port = 999 + [node.ipc.flatbuffers] + skip_unexpected_fields_in_json = false + verify_buffers = false + [node.logging] bulk_pull = true flush = false @@ -615,6 +621,8 @@ TEST (toml, daemon_config_deserialize_no_defaults) ASSERT_NE (conf.node.ipc_config.transport_tcp.io_timeout, defaults.node.ipc_config.transport_tcp.io_timeout); ASSERT_NE (conf.node.ipc_config.transport_tcp.io_threads, defaults.node.ipc_config.transport_tcp.io_threads); ASSERT_NE (conf.node.ipc_config.transport_tcp.port, defaults.node.ipc_config.transport_tcp.port); + ASSERT_NE (conf.node.ipc_config.flatbuffers.skip_unexpected_fields_in_json, defaults.node.ipc_config.flatbuffers.skip_unexpected_fields_in_json); + ASSERT_NE (conf.node.ipc_config.flatbuffers.verify_buffers, defaults.node.ipc_config.flatbuffers.verify_buffers); ASSERT_NE (conf.node.diagnostics_config.txn_tracking.enable, defaults.node.diagnostics_config.txn_tracking.enable); ASSERT_NE (conf.node.diagnostics_config.txn_tracking.ignore_writes_below_block_processor_max_time, defaults.node.diagnostics_config.txn_tracking.ignore_writes_below_block_processor_max_time); diff --git a/nano/ipc_flatbuffers_lib/CMakeLists.txt b/nano/ipc_flatbuffers_lib/CMakeLists.txt new file mode 100644 index 00000000..9d3a8fba --- /dev/null +++ b/nano/ipc_flatbuffers_lib/CMakeLists.txt @@ -0,0 +1,49 @@ +# Build flatc from the Flatbuffers submodule +set (FLATBUFFERS_BUILD_TESTS OFF CACHE BOOL "") +set (FLATBUFFERS_BUILD_FLATHASH OFF CACHE BOOL "") +mark_as_advanced ( + FLATBUFFERS_BUILD_CPP17 + FLATBUFFERS_BUILD_FLATC + FLATBUFFERS_BUILD_FLATHASH + FLATBUFFERS_BUILD_FLATLIB + FLATBUFFERS_BUILD_GRPCTEST + FLATBUFFERS_BUILD_LEGACY + FLATBUFFERS_BUILD_SHAREDLIB + FLATBUFFERS_BUILD_TESTS + FLATBUFFERS_CODE_COVERAGE + FLATBUFFERS_CODE_SANITIZE + FLATBUFFERS_INSTALL + FLATBUFFERS_LIBCXX_WITH_CLANG + FLATBUFFERS_PACKAGE_DEBIAN + FLATBUFFERS_PACKAGE_REDHAT + FLATBUFFERS_STATIC_FLATC) +add_subdirectory(../../flatbuffers ${CMAKE_CURRENT_BINARY_DIR}/flatbuffers-build EXCLUDE_FROM_ALL) + +# Generate Flatbuffers files into the ipc_flatbuffers_lib library, which will be rebuilt +# whenever any of the fbs files change. Note that while this supports multiple fbs files, +# we currently only use one, to avoid include-file issues with certain language bindings. +file(MAKE_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/generated/flatbuffers) +if (APPLE) + install (DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../api/flatbuffers/ DESTINATION Nano.app/Contents/MacOS/api/flatbuffers) +else() + install (DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../api/flatbuffers/ DESTINATION ./api/flatbuffers) +endif() + +file(GLOB files "${CMAKE_CURRENT_SOURCE_DIR}/../../api/flatbuffers/nanoapi*.fbs") +foreach(file ${files}) + get_filename_component(flatbuffers_filename ${file} NAME_WE) + message("Generating flatbuffers code for: ${flatbuffers_filename} into ${CMAKE_CURRENT_SOURCE_DIR}/generated/flatbuffers") + add_custom_command( + OUTPUT ${CMAKE_CURRENT_SOURCE_DIR}/generated/flatbuffers/${flatbuffers_filename}_generated.h + COMMAND "$" --force-empty-vectors --reflect-names --gen-mutable --gen-name-strings --gen-object-api --strict-json --cpp -o ${CMAKE_CURRENT_SOURCE_DIR}/generated/flatbuffers ${CMAKE_CURRENT_SOURCE_DIR}/../../api/flatbuffers/${flatbuffers_filename}.fbs + DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/../../api/flatbuffers/${flatbuffers_filename}.fbs + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) +endforeach() + +add_library (ipc_flatbuffers_lib + generated/flatbuffers/nanoapi_generated.h + flatbuffer_producer.hpp + flatbuffer_producer.cpp +) + +target_link_libraries (ipc_flatbuffers_lib flatbuffers) diff --git a/nano/ipc_flatbuffers_lib/flatbuffer_producer.cpp b/nano/ipc_flatbuffers_lib/flatbuffer_producer.cpp new file mode 100644 index 00000000..9a8692fe --- /dev/null +++ b/nano/ipc_flatbuffers_lib/flatbuffer_producer.cpp @@ -0,0 +1,35 @@ +#include + +nano::ipc::flatbuffer_producer::flatbuffer_producer () +{ + fbb = std::make_shared (); +} + +nano::ipc::flatbuffer_producer::flatbuffer_producer (std::shared_ptr const & builder_a) : +fbb (builder_a) +{ +} + +void nano::ipc::flatbuffer_producer::make_error (int code, std::string const & message) +{ + auto msg = fbb->CreateString (message); + nanoapi::ErrorBuilder builder (*fbb); + builder.add_code (code); + builder.add_message (msg); + create_builder_response (builder); +} + +void nano::ipc::flatbuffer_producer::set_correlation_id (std::string const & correlation_id_a) +{ + correlation_id = correlation_id_a; +} + +void nano::ipc::flatbuffer_producer::set_credentials (std::string const & credentials_a) +{ + credentials = credentials_a; +} + +std::shared_ptr nano::ipc::flatbuffer_producer::get_shared_flatbuffer () const +{ + return fbb; +} diff --git a/nano/ipc_flatbuffers_lib/flatbuffer_producer.hpp b/nano/ipc_flatbuffers_lib/flatbuffer_producer.hpp new file mode 100644 index 00000000..b7a9c3d0 --- /dev/null +++ b/nano/ipc_flatbuffers_lib/flatbuffer_producer.hpp @@ -0,0 +1,91 @@ +#pragma once +#include + +#include +#include + +#include + +namespace nano +{ +namespace ipc +{ + /** Produces Nano API compliant Flatbuffers from objects and builders */ + class flatbuffer_producer + { + public: + flatbuffer_producer (); + flatbuffer_producer (std::shared_ptr const & builder_a); + + template + static std::shared_ptr make_buffer (T & object_a, std::string const & correlation_id_a = {}, std::string const & credentials_a = {}) + { + nano::ipc::flatbuffer_producer producer; + producer.set_correlation_id (correlation_id_a); + producer.set_credentials (credentials_a); + producer.create_response (object_a); + return producer.fbb; + } + + void make_error (int code, std::string const & message); + + /** Every message is put in an envelope, which contains the message type and other sideband information */ + template + auto make_envelope (flatbuffers::Offset obj) + { + auto correlation_id_string = fbb->CreateString (correlation_id); + auto credentials_string = fbb->CreateString (credentials); + nanoapi::EnvelopeBuilder envelope_builder (*fbb); + envelope_builder.add_time (std::chrono::duration_cast (std::chrono::system_clock::now ().time_since_epoch ()).count ()); + envelope_builder.add_message_type (nanoapi::MessageTraits::enum_value); + envelope_builder.add_message (obj.Union ()); + + if (!correlation_id.empty ()) + { + envelope_builder.add_correlation_id (correlation_id_string); + } + if (!credentials.empty ()) + { + envelope_builder.add_credentials (credentials_string); + } + return envelope_builder.Finish (); + } + + template + void create_response (flatbuffers::Offset offset) + { + auto root = make_envelope (offset); + fbb->Finish (root); + } + + template ::value>> + void create_response (T const & obj) + { + create_response (T::TableType::Pack (*fbb, &obj)); + } + + template + void create_builder_response (T builder) + { + auto offset = builder.Finish (); + auto root = make_envelope (offset); + fbb->Finish (root); + } + + /** Set the correlation id. This will be added to the envelope. */ + void set_correlation_id (std::string const & correlation_id_a); + /** Set the credentials. This will be added to the envelope. */ + void set_credentials (std::string const & credentials_a); + /** Returns the flatbuffer */ + std::shared_ptr get_shared_flatbuffer () const; + + private: + /** The builder managed by this instance */ + std::shared_ptr fbb; + /** Correlation id, if available */ + std::string correlation_id; + /** Credentials, if available */ + std::string credentials; + }; +} +} diff --git a/nano/ipc_flatbuffers_test/CMakeLists.txt b/nano/ipc_flatbuffers_test/CMakeLists.txt new file mode 100644 index 00000000..8e62516a --- /dev/null +++ b/nano/ipc_flatbuffers_test/CMakeLists.txt @@ -0,0 +1,12 @@ +add_executable (ipc_flatbuffers_test_client + entry.cpp) + +target_link_libraries (ipc_flatbuffers_test_client + nano_lib + Boost::filesystem + Boost::log_setup + Boost::log + Boost::program_options + Boost::system + Boost::thread + Boost::boost) diff --git a/nano/ipc_flatbuffers_test/entry.cpp b/nano/ipc_flatbuffers_test/entry.cpp new file mode 100644 index 00000000..1b3c7e1e --- /dev/null +++ b/nano/ipc_flatbuffers_test/entry.cpp @@ -0,0 +1,80 @@ +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include + +namespace +{ +void read_message_loop (std::shared_ptr const & connection) +{ + auto buffer (std::make_shared> ()); + connection->async_read_message (buffer, std::chrono::seconds::max (), [buffer, connection](nano::error error_a, size_t size_a) { + if (!error_a) + { + auto verifier (flatbuffers::Verifier (buffer->data (), buffer->size ())); + if (!nanoapi::VerifyEnvelopeBuffer (verifier)) + { + std::cerr << "Invalid message" << std::endl; + } + else + { + auto envelope (nanoapi::GetEnvelope (buffer->data ())); + if (envelope->message_type () == nanoapi::Message_EventConfirmation) + { + std::cout << "Confirmation at " << envelope->time () << std::endl; + auto conf (envelope->message_as_EventConfirmation ()); + std::cout << " Account : " << conf->account ()->str () << std::endl; + std::cout << " Amount : " << conf->amount ()->str () << std::endl; + std::cout << " Block type : " << nanoapi::EnumNamesBlock ()[conf->block_type ()] << std::endl; + auto state_block = conf->block_as_BlockState (); + if (state_block) + { + std::cout << " Balance : " << state_block->balance ()->str () << std::endl; + } + } + + read_message_loop (connection); + } + } + }); +} +} + +/** A sample IPC/flatbuffers client that subscribes to confirmations from a local node */ +int main (int argc, char * const * argv) +{ + boost::asio::io_context io_ctx; + auto connection (std::make_shared (io_ctx)); + // The client only connects to a local live node for now; the test will + // be improved later to handle various options, including port and address. + std::string ipc_address = "::1"; + uint16_t ipc_port = 7077; + connection->async_connect (ipc_address, ipc_port, [connection](nano::error err) { + if (!err) + { + nanoapi::TopicConfirmationT conf; + connection->async_write (nano::ipc::shared_buffer_from (conf), [connection](nano::error err, size_t size) { + if (!err) + { + std::cout << "Awaiting confirmations..." << std::endl; + read_message_loop (connection); + } + }); + } + else + { + std::cerr << err.get_message () << std::endl; + } + }); + + io_ctx.run (); + return 0; +} diff --git a/nano/lib/CMakeLists.txt b/nano/lib/CMakeLists.txt index ad1b531d..ec94f810 100644 --- a/nano/lib/CMakeLists.txt +++ b/nano/lib/CMakeLists.txt @@ -70,6 +70,7 @@ target_link_libraries (nano_lib ed25519 crypto_lib blake2 + ipc_flatbuffers_lib ${CRYPTOPP_LIBRARY} ${CMAKE_DL_LIBS} Boost::boost) diff --git a/nano/lib/asio.hpp b/nano/lib/asio.hpp index ee939dd3..211a18ed 100644 --- a/nano/lib/asio.hpp +++ b/nano/lib/asio.hpp @@ -34,4 +34,16 @@ async_write (AsyncWriteStream & s, nano::shared_const_buffer const & buffer, Wri { return boost::asio::async_write (s, buffer, std::move (handler)); } -} \ No newline at end of file + +/** + * Alternative to nano::async_write where scatter/gather is desired for best performance, and where + * the buffer originates from Flatbuffers. + * @warning The buffers must be captured in the handler to ensure their lifetimes are properly extended. + */ +template +BOOST_ASIO_INITFN_RESULT_TYPE (WriteHandler, void(boost::system::error_code, std::size_t)) +unsafe_async_write (AsyncWriteStream & s, BufferType && buffer, WriteHandler && handler) +{ + return boost::asio::async_write (s, buffer, std::move (handler)); +} +} diff --git a/nano/lib/config.cpp b/nano/lib/config.cpp index 65b9f977..012921d5 100644 --- a/nano/lib/config.cpp +++ b/nano/lib/config.cpp @@ -60,4 +60,8 @@ std::string get_qtwallet_toml_config_path (boost::filesystem::path const & data_ { return (data_path / "config-qtwallet.toml").string (); } +std::string get_access_toml_config_path (boost::filesystem::path const & data_path) +{ + return (data_path / "config-access.toml").string (); +} } diff --git a/nano/lib/config.hpp b/nano/lib/config.hpp index 8f77931a..7a4f9157 100644 --- a/nano/lib/config.hpp +++ b/nano/lib/config.hpp @@ -175,6 +175,7 @@ std::string get_config_path (boost::filesystem::path const & data_path); std::string get_rpc_config_path (boost::filesystem::path const & data_path); std::string get_node_toml_config_path (boost::filesystem::path const & data_path); std::string get_rpc_toml_config_path (boost::filesystem::path const & data_path); +std::string get_access_toml_config_path (boost::filesystem::path const & data_path); std::string get_qtwallet_toml_config_path (boost::filesystem::path const & data_path); /** Called by gtest_main to enforce test network */ diff --git a/nano/lib/errors.cpp b/nano/lib/errors.cpp index 4a15fd4c..7722b595 100644 --- a/nano/lib/errors.cpp +++ b/nano/lib/errors.cpp @@ -10,6 +10,8 @@ std::string nano::error_common_messages::message (int ev) const { case nano::error_common::generic: return "Unknown error"; + case nano::error_common::access_denied: + return "Access denied"; case nano::error_common::missing_account: return "Missing account"; case nano::error_common::missing_balance: @@ -394,6 +396,11 @@ nano::error::operator std::error_code () const return code; } +int nano::error::error_code_as_int () const +{ + return code.value (); +} + /** Implicit bool conversion; true if there's an error */ nano::error::operator bool () const { diff --git a/nano/lib/errors.hpp b/nano/lib/errors.hpp index 031eb1db..a8ac11f5 100644 --- a/nano/lib/errors.hpp +++ b/nano/lib/errors.hpp @@ -17,6 +17,7 @@ enum class error_common { generic = 1, exception, + access_denied, account_not_found, account_not_found_wallet, account_exists, @@ -267,6 +268,11 @@ public: explicit operator bool () const; explicit operator std::string () const; std::string get_message () const; + /** + * The error code as an integer. Note that some error codes have platform dependent values. + * A return value of 0 signifies there is no error. + */ + int error_code_as_int () const; error & on_error (std::string message_a); error & on_error (std::error_code code_a, std::string message_a); error & set (std::string message_a, std::error_code code_a = nano::error_common::generic); diff --git a/nano/lib/ipc.hpp b/nano/lib/ipc.hpp index 60eace70..b9c69187 100644 --- a/nano/lib/ipc.hpp +++ b/nano/lib/ipc.hpp @@ -49,7 +49,7 @@ namespace ipc }; /** - * Payload encodings; add protobuf, flatbuffers and so on as needed. + * Payload encodings. */ enum class payload_encoding : uint8_t { @@ -57,9 +57,20 @@ namespace ipc * Request is preamble followed by 32-bit BE payload length and payload bytes. * Response is 32-bit BE payload length followed by payload bytes. */ - json_legacy = 0x1, - /** Request/response is same as json_legacy and exposes unsafe RPC's */ - json_unsafe = 0x2 + json_v1 = 0x1, + + /** Request/response is same as json_v1, but exposes unsafe RPC's */ + json_v1_unsafe = 0x2, + + /** + * Request is preamble followed by 32-bit BE payload length and payload bytes. + * Response is 32-bit BE payload length followed by payload bytes. + * Payloads must be flatbuffer encoded. + */ + flatbuffers = 0x3, + + /** JSON -> Flatbuffers -> JSON */ + flatbuffers_json = 0x4 }; /** IPC transport interface */ diff --git a/nano/lib/ipc_client.cpp b/nano/lib/ipc_client.cpp index 06005ccd..8f706a99 100644 --- a/nano/lib/ipc_client.cpp +++ b/nano/lib/ipc_client.cpp @@ -10,6 +10,7 @@ #include #include +#include #include namespace @@ -18,8 +19,18 @@ namespace class channel { public: - virtual void async_read (std::shared_ptr> buffer_a, size_t size_a, std::function callback_a) = 0; + virtual void async_read (std::shared_ptr> const & buffer_a, size_t size_a, std::function callback_a) = 0; virtual void async_write (nano::shared_const_buffer const & buffer_a, std::function callback_a) = 0; + + /** + * Read a length-prefixed message asynchronously using the given timeout. This is suitable for full duplex scenarios where it may + * take an arbitrarily long time for the node to send messages for a given subscription. + * Received length must be a big endian 32-bit unsigned integer. + * @param buffer_a Receives the payload + * @param timeout_a How long to await message data. In some scenarios, such as waiting for data on subscriptions, specifying std::chrono::seconds::max() makes sense. + * @param callback_a If called without errors, the payload buffer is successfully populated + */ + virtual void async_read_message (std::shared_ptr> const & buffer_a, std::chrono::seconds timeout_a, std::function callback_a) = 0; }; /* Boost v1.70 introduced breaking changes; the conditional compilation allows 1.6x to be supported as well. */ @@ -31,7 +42,7 @@ using socket_type = boost::asio::basic_stream_socket -class socket_client : public nano::ipc::socket_base, public channel +class socket_client : public nano::ipc::socket_base, public channel, public std::enable_shared_from_this> { public: socket_client (boost::asio::io_context & io_ctx_a, ENDPOINT_TYPE endpoint_a) : @@ -41,13 +52,14 @@ public: void async_resolve (std::string const & host_a, uint16_t port_a, std::function callback_a) { - this->timer_start (io_timeout); - resolver.async_resolve (boost::asio::ip::tcp::resolver::query (host_a, std::to_string (port_a)), [this, callback_a](boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator endpoint_iterator_a) { - this->timer_cancel (); + auto this_l (this->shared_from_this ()); + this_l->timer_start (io_timeout); + resolver.async_resolve (boost::asio::ip::tcp::resolver::query (host_a, std::to_string (port_a)), [this_l, callback_a](boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator endpoint_iterator_a) { + this_l->timer_cancel (); boost::asio::ip::tcp::resolver::iterator end; if (!ec && endpoint_iterator_a != end) { - endpoint = *endpoint_iterator_a; + this_l->endpoint = *endpoint_iterator_a; callback_a (ec, *endpoint_iterator_a); } else @@ -59,40 +71,117 @@ public: void async_connect (std::function callback_a) { - this->timer_start (io_timeout); - socket.async_connect (endpoint, boost::asio::bind_executor (strand, [this, callback_a](boost::system::error_code const & ec) { - this->timer_cancel (); + auto this_l (this->shared_from_this ()); + this_l->timer_start (io_timeout); + socket.async_connect (endpoint, boost::asio::bind_executor (strand, [this_l, callback_a](boost::system::error_code const & ec) { + this_l->timer_cancel (); callback_a (ec); })); } - void async_read (std::shared_ptr> buffer_a, size_t size_a, std::function callback_a) override + void async_read (std::shared_ptr> const & buffer_a, size_t size_a, std::function callback_a) override { - this->timer_start (io_timeout); + auto this_l (this->shared_from_this ()); + this_l->timer_start (io_timeout); buffer_a->resize (size_a); - boost::asio::async_read (socket, boost::asio::buffer (buffer_a->data (), size_a), boost::asio::bind_executor (this->strand, [this, callback_a](boost::system::error_code const & ec, size_t size_a) { - this->timer_cancel (); + boost::asio::async_read (socket, boost::asio::buffer (buffer_a->data (), size_a), boost::asio::bind_executor (this_l->strand, [this_l, buffer_a, callback_a](boost::system::error_code const & ec, size_t size_a) { + this_l->timer_cancel (); callback_a (ec, size_a); })); } void async_write (nano::shared_const_buffer const & buffer_a, std::function callback_a) override { - this->timer_start (io_timeout); - nano::async_write (socket, buffer_a, boost::asio::bind_executor (this->strand, [this, callback_a](boost::system::error_code const & ec, size_t size_a) { - this->timer_cancel (); - callback_a (ec, size_a); + auto this_l (this->shared_from_this ()); + boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback_a, this_l]() { + bool write_in_progress = !this_l->send_queue.empty (); + auto queue_size = this_l->send_queue.size (); + if (queue_size < this_l->queue_size_max) + { + this_l->send_queue.emplace_back (buffer_a, callback_a); + } + if (!write_in_progress) + { + this_l->write_queued_messages (); + } + })); + } + + void write_queued_messages () + { + auto this_l (this->shared_from_this ()); + auto msg (send_queue.front ()); + this_l->timer_start (io_timeout); + nano::async_write (socket, msg.buffer, + boost::asio::bind_executor (strand, + [msg, this_l](boost::system::error_code ec, std::size_t size_a) { + this_l->timer_cancel (); + + if (msg.callback) + { + msg.callback (ec, size_a); + } + + this_l->send_queue.pop_front (); + if (!ec && !this_l->send_queue.empty ()) + { + this_l->write_queued_messages (); + } + })); + } + + void async_read_message (std::shared_ptr> const & buffer_a, std::chrono::seconds timeout_a, std::function callback_a) override + { + auto this_l (this->shared_from_this ()); + this_l->timer_start (timeout_a); + buffer_a->resize (4); + // Read 32 bit big-endian length + boost::asio::async_read (socket, boost::asio::buffer (buffer_a->data (), 4), boost::asio::bind_executor (this_l->strand, [this_l, timeout_a, buffer_a, callback_a](boost::system::error_code const & ec, size_t size_a) { + this_l->timer_cancel (); + if (!ec) + { + uint32_t payload_size_l = boost::endian::big_to_native (*reinterpret_cast (buffer_a->data ())); + buffer_a->resize (payload_size_l); + // Read payload + this_l->timer_start (timeout_a); + this_l->async_read (buffer_a, payload_size_l, [this_l, buffer_a, callback_a](boost::system::error_code const & ec_a, size_t size_a) { + this_l->timer_cancel (); + callback_a (ec_a, size_a); + }); + } + else + { + callback_a (ec, size_a); + } })); } /** Shut down and close socket */ void close () override { - socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both); - socket.close (); + auto this_l (this->shared_from_this ()); + boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l]() { + this_l->socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both); + this_l->socket.close (); + })); } private: + /** Holds the buffer and callback for queued writes */ + class queue_item + { + public: + queue_item (nano::shared_const_buffer const & buffer_a, std::function callback_a) : + buffer (buffer_a), callback (callback_a) + { + } + nano::shared_const_buffer buffer; + std::function callback; + }; + size_t const queue_size_max = 64 * 1024; + /** The send queue is protected by always being accessed through the strand */ + std::deque send_queue; + ENDPOINT_TYPE endpoint; SOCKET_TYPE socket; boost::asio::ip::tcp::resolver resolver; @@ -194,24 +283,50 @@ void nano::ipc::ipc_client::async_write (nano::shared_const_buffer const & buffe }); } -void nano::ipc::ipc_client::async_read (std::shared_ptr> buffer_a, size_t size_a, std::function callback_a) +void nano::ipc::ipc_client::async_read (std::shared_ptr> const & buffer_a, size_t size_a, std::function callback_a) { auto client (boost::polymorphic_downcast (impl.get ())); - client->get_channel ().async_read (buffer_a, size_a, [callback_a](const boost::system::error_code & ec_a, size_t bytes_read_a) { + client->get_channel ().async_read (buffer_a, size_a, [callback_a, buffer_a](const boost::system::error_code & ec_a, size_t bytes_read_a) { callback_a (nano::error (ec_a), bytes_read_a); }); } +/** Read a length-prefixed message asynchronously. Received length must be a big endian 32-bit unsigned integer. */ +void nano::ipc::ipc_client::async_read_message (std::shared_ptr> const & buffer_a, std::chrono::seconds timeout_a, std::function callback_a) +{ + auto client (boost::polymorphic_downcast (impl.get ())); + client->get_channel ().async_read_message (buffer_a, timeout_a, [callback_a, buffer_a](const boost::system::error_code & ec_a, size_t bytes_read_a) { + callback_a (nano::error (ec_a), bytes_read_a); + }); +} + +std::vector nano::ipc::get_preamble (nano::ipc::payload_encoding encoding_a) +{ + std::vector buffer_l; + buffer_l.push_back ('N'); + buffer_l.push_back (static_cast (encoding_a)); + buffer_l.push_back (0); + buffer_l.push_back (0); + return buffer_l; +} + +nano::shared_const_buffer nano::ipc::prepare_flatbuffers_request (std::shared_ptr const & flatbuffer_a) +{ + auto buffer_l (get_preamble (nano::ipc::payload_encoding::flatbuffers)); + auto payload_length = static_cast (flatbuffer_a->GetSize ()); + uint32_t be = boost::endian::native_to_big (payload_length); + char * chars = reinterpret_cast (&be); + buffer_l.insert (buffer_l.end (), chars, chars + sizeof (uint32_t)); + buffer_l.insert (buffer_l.end (), flatbuffer_a->GetBufferPointer (), flatbuffer_a->GetBufferPointer () + flatbuffer_a->GetSize ()); + return nano::shared_const_buffer{ std::move (buffer_l) }; +} + nano::shared_const_buffer nano::ipc::prepare_request (nano::ipc::payload_encoding encoding_a, std::string const & payload_a) { std::vector buffer_l; - if (encoding_a == nano::ipc::payload_encoding::json_legacy) + if (encoding_a == nano::ipc::payload_encoding::json_v1 || encoding_a == nano::ipc::payload_encoding::flatbuffers_json) { - buffer_l.push_back ('N'); - buffer_l.push_back (static_cast (encoding_a)); - buffer_l.push_back (0); - buffer_l.push_back (0); - + buffer_l = get_preamble (encoding_a); auto payload_length = static_cast (payload_a.size ()); uint32_t be = boost::endian::native_to_big (payload_length); char * chars = reinterpret_cast (&be); @@ -221,9 +336,9 @@ nano::shared_const_buffer nano::ipc::prepare_request (nano::ipc::payload_encodin return nano::shared_const_buffer{ std::move (buffer_l) }; } -std::string nano::ipc::request (nano::ipc::ipc_client & ipc_client, std::string const & rpc_action_a) +std::string nano::ipc::request (nano::ipc::payload_encoding encoding_a, nano::ipc::ipc_client & ipc_client, std::string const & rpc_action_a) { - auto req (prepare_request (nano::ipc::payload_encoding::json_legacy, rpc_action_a)); + auto req (prepare_request (encoding_a, rpc_action_a)); auto res (std::make_shared> ()); std::promise result_l; diff --git a/nano/lib/ipc_client.hpp b/nano/lib/ipc_client.hpp index c2669078..6fde3cc7 100644 --- a/nano/lib/ipc_client.hpp +++ b/nano/lib/ipc_client.hpp @@ -1,12 +1,17 @@ #pragma once +#include +#include #include #include #include +#include #include #include +#include + namespace nano { class shared_const_buffer; @@ -39,7 +44,17 @@ namespace ipc void async_write (nano::shared_const_buffer const & buffer_a, std::function callback_a); /** Read \p size_a bytes asynchronously */ - void async_read (std::shared_ptr> buffer_a, size_t size_a, std::function callback_a); + void async_read (std::shared_ptr> const & buffer_a, size_t size_a, std::function callback_a); + + /** + * Read a length-prefixed message asynchronously using the given timeout. This is suitable for full duplex scenarios where it may + * take an arbitrarily long time for the node to send messages for a given subscription. + * Received length must be a big endian 32-bit unsigned integer. + * @param buffer_a Receives the payload + * @param timeout_a How long to await message data. In some scenarios, such as waiting for data on subscriptions, specifying std::chrono::seconds::max() makes sense. + * @param callback_a If called without errors, the payload buffer is successfully populated + */ + void async_read_message (std::shared_ptr> const & buffer_a, std::chrono::seconds timeout_a, std::function callback_a); private: boost::asio::io_context & io_ctx; @@ -49,7 +64,24 @@ namespace ipc }; /** Convenience function for making synchronous IPC calls. The client must be connected */ - std::string request (nano::ipc::ipc_client & ipc_client, std::string const & rpc_action_a); + std::string request (nano::ipc::payload_encoding encoding_a, nano::ipc::ipc_client & ipc_client, std::string const & rpc_action_a); + + /** + * Returns a buffer with an IPC preamble for the given \p encoding_a + */ + std::vector get_preamble (nano::ipc::payload_encoding encoding_a); + + /** + * Returns a buffer with an IPC preamble, followed by 32-bit BE lenght, followed by payload + */ + nano::shared_const_buffer prepare_flatbuffers_request (std::shared_ptr const & flatbuffer_a); + + template + nano::shared_const_buffer shared_buffer_from (T & object_a, std::string const & correlation_id_a = {}, std::string const & credentials_a = {}) + { + auto buffer_l (nano::ipc::flatbuffer_producer::make_buffer (object_a, correlation_id_a, credentials_a)); + return nano::ipc::prepare_flatbuffers_request (buffer_l); + } /** * Returns a buffer with an IPC preamble for the given \p encoding_a followed by the payload. Depending on encoding, diff --git a/nano/lib/locks.hpp b/nano/lib/locks.hpp index 3d22dff4..5d29bad5 100644 --- a/nano/lib/locks.hpp +++ b/nano/lib/locks.hpp @@ -79,4 +79,69 @@ using unique_lock = std::unique_lock; // For consistency wrapping the less well known _any variant which can be used with any lockable type using condition_variable = std::condition_variable_any; + +/** A general purpose monitor template */ +template +class locked +{ +public: + template + locked (Args &&... args) : + obj (std::forward (args)...) + { + } + + struct scoped_lock final + { + scoped_lock (locked * owner_a) : + owner (owner_a) + { + owner->mutex.lock (); + } + + ~scoped_lock () + { + owner->mutex.unlock (); + } + + T * operator-> () + { + return &owner->obj; + } + + T & get () const + { + return owner->obj; + } + + locked * owner{ nullptr }; + }; + + scoped_lock operator-> () + { + return scoped_lock (this); + } + + T & operator= (T const & other) + { + nano::unique_lock lk (mutex); + obj = other; + return obj; + } + + operator T () const + { + return obj; + } + + /** Returns a scoped lock wrapper, allowing multiple calls to the underlying object under the same lock */ + scoped_lock lock () + { + return scoped_lock (this); + } + +private: + T obj; + std::mutex mutex; +}; } diff --git a/nano/lib/rpc_handler_interface.hpp b/nano/lib/rpc_handler_interface.hpp index 4a0011ea..fa25f560 100644 --- a/nano/lib/rpc_handler_interface.hpp +++ b/nano/lib/rpc_handler_interface.hpp @@ -1,17 +1,65 @@ #pragma once #include +#include +#include #include namespace nano { class rpc; +/** Keeps information about http requests, and for v2+ includes path and header values of interest */ +class rpc_handler_request_params final +{ +public: + int rpc_version{ 1 }; + std::string path; + std::string credentials; + std::string correlation_id; + + /** + * If the path is non-empty, this wraps the body inside an IPC API compliant envelope. + * Otherwise the input string is returned unchanged. + * This allows HTTP clients to use a simplified request format by omitting the envelope. + * Envelope fields may still be specified through corresponding nano- header fields. + */ + std::string json_envelope (std::string const & body_a) const + { + std::string body_l; + if (!path.empty ()) + { + std::ostringstream json; + json << "{"; + if (!credentials.empty ()) + { + json << "\"credentials\": \"" << credentials << "\", "; + } + if (!correlation_id.empty ()) + { + json << "\"correlation_id\": \"" << correlation_id << "\", "; + } + json << "\"message_type\": \"" << path << "\", "; + json << "\"message\": " << body_a; + json << "}"; + body_l = json.str (); + } + else + { + body_l = body_a; + } + return body_l; + } +}; + class rpc_handler_interface { public: virtual ~rpc_handler_interface () = default; + /** Process RPC 1.0 request. */ virtual void process_request (std::string const & action, std::string const & body, std::function response) = 0; + /** Process RPC 2.0 request. This is called via the IPC API */ + virtual void process_request_v2 (rpc_handler_request_params const & params_a, std::string const & body, std::function)> response) = 0; virtual void stop () = 0; virtual void rpc_instance (nano::rpc & rpc) = 0; }; diff --git a/nano/nano_node/daemon.cpp b/nano/nano_node/daemon.cpp index b4d85ca2..cd93200c 100644 --- a/nano/nano_node/daemon.cpp +++ b/nano/nano_node/daemon.cpp @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include #include #include @@ -105,7 +105,7 @@ void nano_daemon::daemon::run (boost::filesystem::path const & data_path, nano:: std::cout << error.get_message () << std::endl; std::exit (1); } - rpc_handler = std::make_unique (*node, config.rpc, [&ipc_server, &alarm, &io_ctx]() { + rpc_handler = std::make_unique (*node, ipc_server, config.rpc, [&ipc_server, &alarm, &io_ctx]() { ipc_server.stop (); alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (3), [&io_ctx]() { io_ctx.stop (); diff --git a/nano/nano_node/entry.cpp b/nano/nano_node/entry.cpp index c5d81710..fe564243 100644 --- a/nano/nano_node/entry.cpp +++ b/nano/nano_node/entry.cpp @@ -2,7 +2,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/nano/nano_rpc/entry.cpp b/nano/nano_rpc/entry.cpp index fd2ad134..4ce9b875 100644 --- a/nano/nano_rpc/entry.cpp +++ b/nano/nano_rpc/entry.cpp @@ -2,7 +2,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/nano/nano_wallet/entry.cpp b/nano/nano_wallet/entry.cpp index 88302646..2528f68f 100644 --- a/nano/nano_wallet/entry.cpp +++ b/nano/nano_wallet/entry.cpp @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include #include #include @@ -170,7 +170,6 @@ int run_wallet (QApplication & application, int argc, char * const * argv, boost std::exit (1); #endif } - std::unique_ptr rpc; std::unique_ptr rpc_handler; if (config.rpc_enable) @@ -184,7 +183,7 @@ int run_wallet (QApplication & application, int argc, char * const * argv, boost { show_error (error.get_message ()); } - rpc_handler = std::make_unique (*node, config.rpc); + rpc_handler = std::make_unique (*node, ipc, config.rpc); rpc = nano::get_rpc (io_ctx, rpc_config, *rpc_handler); rpc->start (); } diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index f3900244..585bfa24 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -58,10 +58,20 @@ add_library (node election.cpp gap_cache.hpp gap_cache.cpp - ipc.hpp - ipc.cpp - ipcconfig.hpp - ipcconfig.cpp + ipc/action_handler.hpp + ipc/action_handler.cpp + ipc/flatbuffers_handler.hpp + ipc/flatbuffers_handler.cpp + ipc/flatbuffers_util.hpp + ipc/flatbuffers_util.cpp + ipc/ipc_access_config.hpp + ipc/ipc_access_config.cpp + ipc/ipc_broker.hpp + ipc/ipc_broker.cpp + ipc/ipc_config.hpp + ipc/ipc_config.cpp + ipc/ipc_server.hpp + ipc/ipc_server.cpp json_handler.hpp json_handler.cpp json_payment_observer.hpp @@ -158,3 +168,7 @@ target_compile_definitions(node PRIVATE -DTAG_VERSION_STRING=${TAG_VERSION_STRING} -DGIT_COMMIT_HASH=${GIT_COMMIT_HASH}) + +# This ensures that any changes to Flatbuffers source files will cause a +# regeneration of any C++ header files. +add_dependencies(node ipc_flatbuffers_lib) diff --git a/nano/node/ipc.cpp b/nano/node/ipc.cpp deleted file mode 100644 index ab47a486..00000000 --- a/nano/node/ipc.cpp +++ /dev/null @@ -1,326 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -#include - -using namespace boost::log; - -namespace -{ -/** - * A session represents an inbound connection over which multiple requests/reponses are transmitted. - */ -template -class session : public nano::ipc::socket_base, public std::enable_shared_from_this> -{ -public: - session (nano::ipc::ipc_server & server_a, boost::asio::io_context & io_ctx_a, nano::ipc::ipc_config_transport & config_transport_a) : - socket_base (io_ctx_a), - server (server_a), node (server_a.node), session_id (server_a.id_dispenser.fetch_add (1)), io_ctx (io_ctx_a), socket (io_ctx_a), config_transport (config_transport_a) - { - if (node.config.logging.log_ipc ()) - { - node.logger.always_log ("IPC: created session with id: ", session_id); - } - } - - SOCKET_TYPE & get_socket () - { - return socket; - } - - /** - * Async read of exactly \p size_a bytes. The callback is invoked only when all the data is available and - * no error has occurred. On error, the error is logged, the read cycle stops and the session ends. Clients - * are expected to implement reconnect logic. - */ - void async_read_exactly (void * buff_a, size_t size_a, std::function const & callback_a) - { - async_read_exactly (buff_a, size_a, std::chrono::seconds (config_transport.io_timeout), callback_a); - } - - /** - * Async read of exactly \p size_a bytes and a specific \p timeout_a. - * @see async_read_exactly (void *, size_t, std::function) - */ - void async_read_exactly (void * buff_a, size_t size_a, std::chrono::seconds timeout_a, std::function const & callback_a) - { - timer_start (timeout_a); - auto this_l (this->shared_from_this ()); - boost::asio::async_read (socket, - boost::asio::buffer (buff_a, size_a), - boost::asio::transfer_exactly (size_a), - [this_l, callback_a](boost::system::error_code const & ec, size_t bytes_transferred_a) { - this_l->timer_cancel (); - if (ec == boost::asio::error::connection_aborted || ec == boost::asio::error::connection_reset) - { - if (this_l->node.config.logging.log_ipc ()) - { - this_l->node.logger.always_log (boost::str (boost::format ("IPC: error reading %1% ") % ec.message ())); - } - } - else if (bytes_transferred_a > 0) - { - callback_a (); - } - }); - } - - /** Handler for payload_encoding::json_legacy */ - void handle_json_query (bool allow_unsafe) - { - session_timer.restart (); - auto request_id_l (std::to_string (server.id_dispenser.fetch_add (1))); - - // This is called when nano::rpc_handler#process_request is done. We convert to - // json and write the response to the ipc socket with a length prefix. - auto this_l (this->shared_from_this ()); - auto response_handler_l ([this_l, request_id_l](std::string const & body) { - auto big = boost::endian::native_to_big (static_cast (body.size ())); - std::vector buffer; - buffer.insert (buffer.end (), reinterpret_cast (&big), reinterpret_cast (&big) + sizeof (std::uint32_t)); - buffer.insert (buffer.end (), body.begin (), body.end ()); - if (this_l->node.config.logging.log_ipc ()) - { - this_l->node.logger.always_log (boost::str (boost::format ("IPC/RPC request %1% completed in: %2% %3%") % request_id_l % this_l->session_timer.stop ().count () % this_l->session_timer.unit ())); - } - - this_l->timer_start (std::chrono::seconds (this_l->config_transport.io_timeout)); - nano::async_write (this_l->socket, nano::shared_const_buffer (buffer), [this_l](boost::system::error_code const & error_a, size_t size_a) { - this_l->timer_cancel (); - if (!error_a) - { - this_l->read_next_request (); - } - else if (this_l->node.config.logging.log_ipc ()) - { - this_l->node.logger.always_log ("IPC: Write failed: ", error_a.message ()); - } - }); - - // Do not call any member variables here (like session_timer) as it's possible that the next request may already be underway. - }); - - node.stats.inc (nano::stat::type::ipc, nano::stat::detail::invocations); - auto body (std::string (reinterpret_cast (buffer.data ()), buffer.size ())); - - // Note that if the rpc action is async, the shared_ptr lifetime will be extended by the action handler - auto handler (std::make_shared (node, server.node_rpc_config, body, response_handler_l, [& server = server]() { - server.stop (); - server.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (3), [& io_ctx = server.node.alarm.io_ctx]() { - io_ctx.stop (); - }); - })); - // For unsafe actions to be allowed, the unsafe encoding must be used AND the transport config must allow it - handler->process_request (allow_unsafe && config_transport.allow_unsafe); - } - - /** Async request reader */ - void read_next_request () - { - auto this_l = this->shared_from_this (); - - // Await next request indefinitely - buffer.resize (sizeof (buffer_size)); - async_read_exactly (buffer.data (), buffer.size (), std::chrono::seconds::max (), [this_l]() { - if (this_l->buffer[nano::ipc::preamble_offset::lead] != 'N' || this_l->buffer[nano::ipc::preamble_offset::reserved_1] != 0 || this_l->buffer[nano::ipc::preamble_offset::reserved_2] != 0) - { - if (this_l->node.config.logging.log_ipc ()) - { - this_l->node.logger.always_log ("IPC: Invalid preamble"); - } - } - else if (this_l->buffer[nano::ipc::preamble_offset::encoding] == static_cast (nano::ipc::payload_encoding::json_legacy) || this_l->buffer[nano::ipc::preamble_offset::encoding] == static_cast (nano::ipc::payload_encoding::json_unsafe)) - { - auto allow_unsafe (this_l->buffer[nano::ipc::preamble_offset::encoding] == static_cast (nano::ipc::payload_encoding::json_unsafe)); - // Length of payload - this_l->async_read_exactly (&this_l->buffer_size, sizeof (this_l->buffer_size), [this_l, allow_unsafe]() { - boost::endian::big_to_native_inplace (this_l->buffer_size); - this_l->buffer.resize (this_l->buffer_size); - // Payload (ptree compliant JSON string) - this_l->async_read_exactly (this_l->buffer.data (), this_l->buffer_size, [this_l, allow_unsafe]() { - this_l->handle_json_query (allow_unsafe); - }); - }); - } - else if (this_l->node.config.logging.log_ipc ()) - { - this_l->node.logger.always_log ("IPC: Unsupported payload encoding"); - } - }); - } - - /** Shut down and close socket */ - void close () - { - socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both); - socket.close (); - } - -private: - nano::ipc::ipc_server & server; - nano::node & node; - - /** Unique session id used for logging */ - uint64_t session_id; - - /** Timer for measuring the duration of ipc calls */ - nano::timer session_timer; - - /** - * IO context from node, or per-transport, depending on configuration. - * Certain transports may scale better if they use a separate context. - */ - boost::asio::io_context & io_ctx; - - /** A socket of the given asio type */ - SOCKET_TYPE socket; - - /** Buffer sizes are read into this */ - uint32_t buffer_size{ 0 }; - - /** Buffer used to store data received from the client */ - std::vector buffer; - - /** Transport configuration */ - nano::ipc::ipc_config_transport & config_transport; -}; - -/** Domain and TCP socket transport */ -template -class socket_transport : public nano::ipc::transport -{ -public: - socket_transport (nano::ipc::ipc_server & server_a, ENDPOINT_TYPE endpoint_a, nano::ipc::ipc_config_transport & config_transport_a, int concurrency_a) : - server (server_a), config_transport (config_transport_a) - { - // Using a per-transport event dispatcher? - if (concurrency_a > 0) - { - io_ctx = std::make_unique (); - } - - boost::asio::socket_base::reuse_address option (true); - boost::asio::socket_base::keep_alive option_keepalive (true); - acceptor = std::make_unique (context (), endpoint_a); - acceptor->set_option (option); - acceptor->set_option (option_keepalive); - accept (); - - // Start serving IO requests. If concurrency_a is < 1, the node's thread pool/io_context is used instead. - // A separate io_context for domain sockets may facilitate better performance on some systems. - if (concurrency_a > 0) - { - runner = std::make_unique (*io_ctx, static_cast (concurrency_a)); - } - } - - boost::asio::io_context & context () const - { - return io_ctx ? *io_ctx : server.node.io_ctx; - } - - void accept () - { - // Prepare the next session - auto new_session (std::make_shared> (server, context (), config_transport)); - - acceptor->async_accept (new_session->get_socket (), [this, new_session](boost::system::error_code const & ec) { - if (!ec) - { - new_session->read_next_request (); - } - else - { - server.node.logger.always_log ("IPC: acceptor error: ", ec.message ()); - } - - if (ec != boost::asio::error::operation_aborted && acceptor->is_open ()) - { - this->accept (); - } - else - { - server.node.logger.always_log ("IPC: shutting down"); - } - }); - } - - void stop () - { - acceptor->close (); - if (io_ctx) - { - io_ctx->stop (); - } - - if (runner) - { - runner->join (); - } - } - -private: - nano::ipc::ipc_server & server; - nano::ipc::ipc_config_transport & config_transport; - std::unique_ptr runner; - std::unique_ptr io_ctx; - std::unique_ptr acceptor; -}; -} - -nano::ipc::ipc_server::ipc_server (nano::node & node_a, nano::node_rpc_config const & node_rpc_config_a) : -node (node_a), -node_rpc_config (node_rpc_config_a) -{ - try - { - if (node_a.config.ipc_config.transport_domain.enabled) - { -#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS) - auto threads = node_a.config.ipc_config.transport_domain.io_threads; - file_remover = std::make_unique (node_a.config.ipc_config.transport_domain.path); - boost::asio::local::stream_protocol::endpoint ep{ node_a.config.ipc_config.transport_domain.path }; - transports.push_back (std::make_shared> (*this, ep, node_a.config.ipc_config.transport_domain, threads)); -#else - node.logger.always_log ("IPC: Domain sockets are not supported on this platform"); -#endif - } - - if (node_a.config.ipc_config.transport_tcp.enabled) - { - auto threads = node_a.config.ipc_config.transport_tcp.io_threads; - transports.push_back (std::make_shared> (*this, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v6 (), node_a.config.ipc_config.transport_tcp.port), node_a.config.ipc_config.transport_tcp, threads)); - } - - node.logger.always_log ("IPC: server started"); - } - catch (std::runtime_error const & ex) - { - node.logger.always_log ("IPC: ", ex.what ()); - } -} - -nano::ipc::ipc_server::~ipc_server () -{ - node.logger.always_log ("IPC: server stopped"); -} - -void nano::ipc::ipc_server::stop () -{ - for (auto & transport : transports) - { - transport->stop (); - } -} diff --git a/nano/node/ipc.hpp b/nano/node/ipc.hpp deleted file mode 100644 index 98cf07a3..00000000 --- a/nano/node/ipc.hpp +++ /dev/null @@ -1,34 +0,0 @@ -#pragma once - -#include -#include - -#include - -namespace nano -{ -class node; - -namespace ipc -{ - /** The IPC server accepts connections on one or more configured transports */ - class ipc_server - { - public: - ipc_server (nano::node & node_a, nano::node_rpc_config const & node_rpc_config); - - virtual ~ipc_server (); - void stop (); - - nano::node & node; - nano::node_rpc_config const & node_rpc_config; - - /** Unique counter/id shared across sessions */ - std::atomic id_dispenser{ 0 }; - - private: - std::unique_ptr file_remover; - std::vector> transports; - }; -} -} diff --git a/nano/node/ipc/action_handler.cpp b/nano/node/ipc/action_handler.cpp new file mode 100644 index 00000000..bdc06656 --- /dev/null +++ b/nano/node/ipc/action_handler.cpp @@ -0,0 +1,183 @@ +#include +#include +#include +#include +#include +#include + +#include + +namespace +{ +nano::account parse_account (std::string const & account, bool & out_is_deprecated_format) +{ + nano::account result (0); + if (account.empty ()) + { + throw nano::error (nano::error_common::bad_account_number); + } + if (result.decode_account (account)) + { + throw nano::error (nano::error_common::bad_account_number); + } + else if (account[3] == '-' || account[4] == '-') + { + out_is_deprecated_format = true; + } + + return result; +} +/** Returns the message as a Flatbuffers ObjectAPI type, managed by a unique_ptr */ +template +auto get_message (nanoapi::Envelope const & envelope) +{ + auto raw (envelope.message_as ()->UnPack ()); + return std::unique_ptr (raw); +} +} + +/** + * Mapping from message type to handler function. + * @note This must be updated whenever a new message type is added to the Flatbuffers IDL. + */ +auto nano::ipc::action_handler::handler_map () -> std::unordered_map, nano::ipc::enum_hash> +{ + static std::unordered_map, nano::ipc::enum_hash> handlers; + if (handlers.empty ()) + { + handlers.emplace (nanoapi::Message::Message_IsAlive, &nano::ipc::action_handler::on_is_alive); + handlers.emplace (nanoapi::Message::Message_TopicConfirmation, &nano::ipc::action_handler::on_topic_confirmation); + handlers.emplace (nanoapi::Message::Message_AccountWeight, &nano::ipc::action_handler::on_account_weight); + handlers.emplace (nanoapi::Message::Message_ServiceRegister, &nano::ipc::action_handler::on_service_register); + handlers.emplace (nanoapi::Message::Message_ServiceStop, &nano::ipc::action_handler::on_service_stop); + handlers.emplace (nanoapi::Message::Message_TopicServiceStop, &nano::ipc::action_handler::on_topic_service_stop); + } + return handlers; +} + +nano::ipc::action_handler::action_handler (nano::node & node_a, nano::ipc::ipc_server & server_a, std::weak_ptr const & subscriber_a, std::shared_ptr const & builder_a) : +flatbuffer_producer (builder_a), +node (node_a), +ipc_server (server_a), +subscriber (subscriber_a) +{ +} + +void nano::ipc::action_handler::on_topic_confirmation (nanoapi::Envelope const & envelope_a) +{ + auto confirmationTopic (get_message (envelope_a)); + ipc_server.get_broker ().subscribe (subscriber, std::move (confirmationTopic)); + nanoapi::EventAckT ack; + create_response (ack); +} + +void nano::ipc::action_handler::on_service_register (nanoapi::Envelope const & envelope_a) +{ + require_oneof (envelope_a, { nano::ipc::access_permission::api_service_register, nano::ipc::access_permission::service }); + auto query (get_message (envelope_a)); + ipc_server.get_broker ().service_register (query->service_name, this->subscriber); + nanoapi::SuccessT success; + create_response (success); +} + +void nano::ipc::action_handler::on_service_stop (nanoapi::Envelope const & envelope_a) +{ + require_oneof (envelope_a, { nano::ipc::access_permission::api_service_stop, nano::ipc::access_permission::service }); + auto query (get_message (envelope_a)); + if (query->service_name == "node") + { + ipc_server.node.stop (); + } + else + { + ipc_server.get_broker ().service_stop (query->service_name); + } + nanoapi::SuccessT success; + create_response (success); +} + +void nano::ipc::action_handler::on_topic_service_stop (nanoapi::Envelope const & envelope_a) +{ + auto topic (get_message (envelope_a)); + ipc_server.get_broker ().subscribe (subscriber, std::move (topic)); + nanoapi::EventAckT ack; + create_response (ack); +} + +void nano::ipc::action_handler::on_account_weight (nanoapi::Envelope const & envelope_a) +{ + require_oneof (envelope_a, { nano::ipc::access_permission::api_account_weight, nano::ipc::access_permission::account_query }); + bool is_deprecated_format{ false }; + auto query (get_message (envelope_a)); + auto balance (node.weight (parse_account (query->account, is_deprecated_format))); + + nanoapi::AccountWeightResponseT response; + response.voting_weight = balance.str (); + create_response (response); +} + +void nano::ipc::action_handler::on_is_alive (nanoapi::Envelope const & envelope) +{ + nanoapi::IsAliveT alive; + create_response (alive); +} + +bool nano::ipc::action_handler::has_access (nanoapi::Envelope const & envelope_a, nano::ipc::access_permission permission_a) const noexcept +{ + // If credentials are missing in the envelope, the default user is used + std::string credentials; + if (envelope_a.credentials () != nullptr) + { + credentials = envelope_a.credentials ()->str (); + } + + return ipc_server.get_access ().has_access (credentials, permission_a); +} + +bool nano::ipc::action_handler::has_access_to_all (nanoapi::Envelope const & envelope_a, std::initializer_list permissions_a) const noexcept +{ + // If credentials are missing in the envelope, the default user is used + std::string credentials; + if (envelope_a.credentials () != nullptr) + { + credentials = envelope_a.credentials ()->str (); + } + + return ipc_server.get_access ().has_access_to_all (credentials, permissions_a); +} + +bool nano::ipc::action_handler::has_access_to_oneof (nanoapi::Envelope const & envelope_a, std::initializer_list permissions_a) const noexcept +{ + // If credentials are missing in the envelope, the default user is used + std::string credentials; + if (envelope_a.credentials () != nullptr) + { + credentials = envelope_a.credentials ()->str (); + } + + return ipc_server.get_access ().has_access_to_oneof (credentials, permissions_a); +} + +void nano::ipc::action_handler::require (nanoapi::Envelope const & envelope_a, nano::ipc::access_permission permission_a) const +{ + if (!has_access (envelope_a, permission_a)) + { + throw nano::error (nano::error_common::access_denied); + } +} + +void nano::ipc::action_handler::require_all (nanoapi::Envelope const & envelope_a, std::initializer_list permissions_a) const +{ + if (!has_access_to_all (envelope_a, permissions_a)) + { + throw nano::error (nano::error_common::access_denied); + } +} + +void nano::ipc::action_handler::require_oneof (nanoapi::Envelope const & envelope_a, std::initializer_list permissions_a) const +{ + if (!has_access_to_oneof (envelope_a, permissions_a)) + { + throw nano::error (nano::error_common::access_denied); + } +} diff --git a/nano/node/ipc/action_handler.hpp b/nano/node/ipc/action_handler.hpp new file mode 100644 index 00000000..63c51c57 --- /dev/null +++ b/nano/node/ipc/action_handler.hpp @@ -0,0 +1,71 @@ +#pragma once + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace nano +{ +class error; +class node; +namespace ipc +{ + class ipc_server; + class subscriber; + + /** + * Implements handlers for the various public IPC messages. When an action handler is completed, + * the flatbuffer contains the serialized response object. + * @note This is a light-weight class, and an instance can be created for every request. + */ + class action_handler final : public flatbuffer_producer, public std::enable_shared_from_this + { + public: + action_handler (nano::node & node, nano::ipc::ipc_server & server, std::weak_ptr const & subscriber, std::shared_ptr const & builder); + + void on_account_weight (nanoapi::Envelope const & envelope); + void on_is_alive (nanoapi::Envelope const & envelope); + void on_topic_confirmation (nanoapi::Envelope const & envelope); + + /** Request to register a service. The service name is associated with the current session. */ + void on_service_register (nanoapi::Envelope const & envelope); + + /** Request to stop a service by name */ + void on_service_stop (nanoapi::Envelope const & envelope); + + /** Subscribe to the ServiceStop event. The service must first have registered itself on the same session. */ + void on_topic_service_stop (nanoapi::Envelope const & envelope); + + /** Returns a mapping from api message types to handler functions */ + static auto handler_map () -> std::unordered_map, nano::ipc::enum_hash>; + + private: + bool has_access (nanoapi::Envelope const & envelope_a, nano::ipc::access_permission permission_a) const noexcept; + bool has_access_to_all (nanoapi::Envelope const & envelope_a, std::initializer_list permissions_a) const noexcept; + bool has_access_to_oneof (nanoapi::Envelope const & envelope_a, std::initializer_list permissions_a) const noexcept; + void require (nanoapi::Envelope const & envelope_a, nano::ipc::access_permission permission_a) const; + void require_all (nanoapi::Envelope const & envelope_a, std::initializer_list permissions_a) const; + void require_oneof (nanoapi::Envelope const & envelope_a, std::initializer_list alternative_permissions_a) const; + + nano::node & node; + nano::ipc::ipc_server & ipc_server; + std::weak_ptr subscriber; + }; +} +} diff --git a/nano/node/ipc/flatbuffers_handler.cpp b/nano/node/ipc/flatbuffers_handler.cpp new file mode 100644 index 00000000..f3fd2b35 --- /dev/null +++ b/nano/node/ipc/flatbuffers_handler.cpp @@ -0,0 +1,198 @@ +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace +{ +auto handler_map = nano::ipc::action_handler::handler_map (); + +/** + * A helper for when it's necessary to create a JSON error response manually + */ +std::string make_error_response (std::string const & error_message) +{ + std::ostringstream json; + json << R"json({"message_type": "Error", "message": {"code": 1, "message": ")json" + << error_message + << R"json("}})json"; + return json.str (); +} + +/** + * Returns the 'api/flatbuffers' directory, boost::none if not found. + * This searches the binary path as well as the parent (which is mostly useful for development) + */ +boost::optional get_api_path () +{ + auto parent_path = boost::dll::program_location ().parent_path (); + if (!boost::filesystem::exists (parent_path / "api" / "flatbuffers")) + { + // See if the parent directory has the api subdirectories + if (parent_path.has_parent_path ()) + { + parent_path = boost::dll::program_location ().parent_path ().parent_path (); + } + + if (!boost::filesystem::exists (parent_path / "api" / "flatbuffers")) + { + return boost::none; + } + } + return parent_path / "api" / "flatbuffers"; +} +} + +nano::ipc::flatbuffers_handler::flatbuffers_handler (nano::node & node_a, nano::ipc::ipc_server & ipc_server_a, std::shared_ptr const & subscriber_a, nano::ipc::ipc_config const & ipc_config_a) : +node (node_a), +ipc_server (ipc_server_a), +subscriber (subscriber_a), +ipc_config (ipc_config_a) +{ +} + +std::shared_ptr nano::ipc::flatbuffers_handler::make_flatbuffers_parser (nano::ipc::ipc_config const & ipc_config_a) +{ + auto parser (std::make_shared ()); + parser->opts.strict_json = true; + parser->opts.skip_unexpected_fields_in_json = ipc_config_a.flatbuffers.skip_unexpected_fields_in_json; + + auto api_path = get_api_path (); + if (!api_path) + { + throw nano::error ("Internal IPC error: unable to find api path"); + } + + const char * include_directories[] = { api_path->string ().c_str (), nullptr }; + std::string schemafile; + if (!flatbuffers::LoadFile ((*api_path / "nanoapi.fbs").string ().c_str (), false, &schemafile)) + { + throw nano::error ("Internal IPC error: unable to load schema file"); + } + + auto parse_success = parser->Parse (schemafile.c_str (), include_directories); + if (!parse_success) + { + std::string parser_error = "Internal IPC error: unable to parse schema file: "; + parser_error += parser->error_.c_str (); + throw nano::error (parser_error); + } + return parser; +} + +void nano::ipc::flatbuffers_handler::process_json (const uint8_t * message_buffer_a, size_t buffer_size_a, +std::function)> response_handler) +{ + try + { + if (!parser) + { + parser = make_flatbuffers_parser (ipc_config); + } + + // Convert request from JSON + auto body (std::string (reinterpret_cast (const_cast (message_buffer_a)), buffer_size_a)); + body += '\0'; + if (parser->Parse (reinterpret_cast (body.data ()))) + { + process (parser->builder_.GetBufferPointer (), parser->builder_.GetSize (), [parser = parser, response_handler](std::shared_ptr fbb) { + // Convert response to JSON + auto json (std::make_shared ()); + if (!flatbuffers::GenerateText (*parser, fbb->GetBufferPointer (), json.get ())) + { + throw nano::error ("Couldn't serialize response to JSON"); + } + + response_handler (json); + }); + } + else + { + std::string parser_error = "Invalid message format: "; + parser_error += parser->error_.c_str (); + throw nano::error (parser_error); + } + } + catch (nano::error const & err) + { + // Forces the parser construction to be retried as certain errors are + // recoverable (such path errors getting fixed by the user without a node restart) + parser = nullptr; + + // Convert error response to JSON. We must construct this manually since the exception + // may be parser related (such as not being able to load the schema) + response_handler (std::make_shared (make_error_response (err.get_message ()))); + } + catch (...) + { + std::cerr << "Unknown exception in " << __FUNCTION__ << std::endl; + response_handler (std::make_shared (make_error_response ("Unknown exception"))); + } +} + +void nano::ipc::flatbuffers_handler::process (const uint8_t * message_buffer_a, size_t buffer_size_a, +std::function)> response_handler) +{ + auto buffer_l (std::make_shared ()); + auto actionhandler (std::make_shared (node, ipc_server, subscriber, buffer_l)); + std::string correlationId = ""; + + // Find and call the action handler + try + { + // By default we verify the buffers, to make sure offsets reside inside the buffer. + // This brings the buffer into cache, making the overall verify+parse overhead low. + if (ipc_config.flatbuffers.verify_buffers) + { + auto verifier (flatbuffers::Verifier (message_buffer_a, buffer_size_a)); + if (!nanoapi::VerifyEnvelopeBuffer (verifier)) + { + throw nano::error ("Envelope buffer did not pass verifier"); + } + } + + auto incoming = nanoapi::GetEnvelope (message_buffer_a); + if (incoming == nullptr) + { + nano::error err ("Invalid message"); + actionhandler->make_error (err.error_code_as_int (), err.get_message ()); + response_handler (buffer_l); + return; + } + + auto handler_method = handler_map.find (incoming->message_type ()); + if (handler_method != handler_map.end ()) + { + if (incoming->correlation_id ()) + { + actionhandler->set_correlation_id (incoming->correlation_id ()->str ()); + } + handler_method->second (actionhandler.get (), *incoming); + } + else + { + nano::error err ("Unknown message type"); + actionhandler->make_error (err.error_code_as_int (), err.get_message ()); + } + } + catch (nano::error const & err) + { + actionhandler->make_error (err.error_code_as_int (), err.get_message ()); + } + + response_handler (buffer_l); +} diff --git a/nano/node/ipc/flatbuffers_handler.hpp b/nano/node/ipc/flatbuffers_handler.hpp new file mode 100644 index 00000000..59657661 --- /dev/null +++ b/nano/node/ipc/flatbuffers_handler.hpp @@ -0,0 +1,65 @@ +#pragma once + +#include +#include +#include +#include + +namespace flatbuffers +{ +class FlatBufferBuilder; +class Parser; +} +namespace nano +{ +class node; +namespace ipc +{ + class subscriber; + class ipc_config; + class ipc_server; + /** + * This handler sits between the IPC server and the action handler. Its job is to deserialize + * Flatbuffers in binary and json formats into high level message objects. These messages are + * then used to dispatch the correct action handler. + * @throws Methods of this class throw nano::error on failure. + * @note This class is not thread safe; use one instance per session/thread. + */ + class flatbuffers_handler final : public std::enable_shared_from_this + { + public: + /** + * Constructs the handler. + * @param node_a Node + * @param subscriber Subscriber instance + * @param ipc_server_a Optional IPC server (may be nullptr, i.e when calling through the RPC gateway) + */ + flatbuffers_handler (nano::node & node_a, nano::ipc::ipc_server & ipc_server_a, std::shared_ptr const & subscriber_a, nano::ipc::ipc_config const & ipc_config_a); + + /** + * Deserialize flatbuffer message, look up and call the action handler, then call the response handler with a + * FlatBufferBuilder to allow for zero-copy transfers of data. + * @param response_handler Receives a shared pointer to the flatbuffer builder, from which the buffer and size can be queried + * @throw Throws std:runtime_error on deserialization or processing errors + */ + void process (const uint8_t * message_buffer_a, size_t buffer_size_a, std::function)> response_handler); + + /** + * Parses a JSON encoded requests into Flatbuffer format, calls process(), yields the result as a JSON string + */ + void process_json (const uint8_t * message_buffer_a, size_t buffer_size_a, std::function)> response_handler); + + /** + * Creates a Flatbuffers parser with the schema preparsed. This can then be used to parse and produce JSON. + */ + static std::shared_ptr make_flatbuffers_parser (nano::ipc::ipc_config const & ipc_config_a); + + private: + std::shared_ptr parser; + nano::node & node; + nano::ipc::ipc_server & ipc_server; + std::weak_ptr subscriber; + nano::ipc::ipc_config const & ipc_config; + }; +} +} diff --git a/nano/node/ipc/flatbuffers_util.cpp b/nano/node/ipc/flatbuffers_util.cpp new file mode 100644 index 00000000..4ab2358c --- /dev/null +++ b/nano/node/ipc/flatbuffers_util.cpp @@ -0,0 +1,120 @@ +#include +#include +#include +#include + +std::unique_ptr nano::ipc::flatbuffers_builder::from (nano::state_block const & block_a, nano::amount const & amount_a, bool is_state_send_a) +{ + static nano::network_params params; + auto block (std::make_unique ()); + block->account = block_a.account ().to_account (); + block->hash = block_a.hash ().to_string (); + block->previous = block_a.previous ().to_string (); + block->representative = block_a.representative ().to_account (); + block->balance = block_a.balance ().to_string_dec (); + block->link = block_a.link ().to_string (); + block->link_as_account = block_a.link ().to_account (); + block_a.signature.encode_hex (block->signature); + block->work = nano::to_string_hex (block_a.work); + + if (is_state_send_a) + { + block->subtype = nanoapi::BlockSubType::BlockSubType_send; + } + else if (block_a.link ().is_zero ()) + { + block->subtype = nanoapi::BlockSubType::BlockSubType_change; + } + else if (amount_a == 0 && params.ledger.epochs.is_epoch_link (block_a.link ())) + { + block->subtype = nanoapi::BlockSubType::BlockSubType_epoch; + } + else + { + block->subtype = nanoapi::BlockSubType::BlockSubType_receive; + } + return block; +} + +std::unique_ptr nano::ipc::flatbuffers_builder::from (nano::send_block const & block_a) +{ + auto block (std::make_unique ()); + block->hash = block_a.hash ().to_string (); + block->balance = block_a.balance ().to_string_dec (); + block->destination = block_a.hashables.destination.to_account (); + block->previous = block_a.previous ().to_string (); + block_a.signature.encode_hex (block->signature); + block->work = nano::to_string_hex (block_a.work); + return block; +} + +std::unique_ptr nano::ipc::flatbuffers_builder::from (nano::receive_block const & block_a) +{ + auto block (std::make_unique ()); + block->hash = block_a.hash ().to_string (); + block->source = block_a.source ().to_string (); + block->previous = block_a.previous ().to_string (); + block_a.signature.encode_hex (block->signature); + block->work = nano::to_string_hex (block_a.work); + return block; +} + +std::unique_ptr nano::ipc::flatbuffers_builder::from (nano::open_block const & block_a) +{ + auto block (std::make_unique ()); + block->hash = block_a.hash ().to_string (); + block->source = block_a.source ().to_string (); + block->account = block_a.account ().to_account (); + block->representative = block_a.representative ().to_account (); + block_a.signature.encode_hex (block->signature); + block->work = nano::to_string_hex (block_a.work); + return block; +} + +std::unique_ptr nano::ipc::flatbuffers_builder::from (nano::change_block const & block_a) +{ + auto block (std::make_unique ()); + block->hash = block_a.hash ().to_string (); + block->previous = block_a.previous ().to_string (); + block->representative = block_a.representative ().to_account (); + block_a.signature.encode_hex (block->signature); + block->work = nano::to_string_hex (block_a.work); + return block; +} + +nanoapi::BlockUnion nano::ipc::flatbuffers_builder::block_to_union (nano::block const & block_a, nano::amount const & amount_a, bool is_state_send_a) +{ + nanoapi::BlockUnion u; + switch (block_a.type ()) + { + case nano::block_type::state: + { + u.Set (*from (dynamic_cast (block_a), amount_a, is_state_send_a)); + break; + } + case nano::block_type::send: + { + u.Set (*from (dynamic_cast (block_a))); + break; + } + case nano::block_type::receive: + { + u.Set (*from (dynamic_cast (block_a))); + break; + } + case nano::block_type::open: + { + u.Set (*from (dynamic_cast (block_a))); + break; + } + case nano::block_type::change: + { + u.Set (*from (dynamic_cast (block_a))); + break; + } + + default: + assert (false); + } + return u; +} diff --git a/nano/node/ipc/flatbuffers_util.hpp b/nano/node/ipc/flatbuffers_util.hpp new file mode 100644 index 00000000..c1236a94 --- /dev/null +++ b/nano/node/ipc/flatbuffers_util.hpp @@ -0,0 +1,32 @@ +#pragma once + +#include + +#include + +namespace nano +{ +class amount; +class block; +class send_block; +class receive_block; +class change_block; +class open_block; +class state_block; +namespace ipc +{ + /** + * Utilities to convert between blocks and Flatbuffers equivalents + */ + class flatbuffers_builder + { + public: + static nanoapi::BlockUnion block_to_union (nano::block const & block_a, nano::amount const & amount_a, bool is_state_send_a = false); + static std::unique_ptr from (nano::state_block const & block_a, nano::amount const & amount_a, bool is_state_send_a); + static std::unique_ptr from (nano::send_block const & block_a); + static std::unique_ptr from (nano::receive_block const & block_a); + static std::unique_ptr from (nano::open_block const & block_a); + static std::unique_ptr from (nano::change_block const & block_a); + }; +} +} diff --git a/nano/node/ipc/ipc_access_config.cpp b/nano/node/ipc/ipc_access_config.cpp new file mode 100644 index 00000000..0f1b2f06 --- /dev/null +++ b/nano/node/ipc/ipc_access_config.cpp @@ -0,0 +1,345 @@ +#include +#include + +#include + +namespace +{ +/** Convert permission to strings. This is how permissions appears in config-access.toml */ +std::string to_string (nano::ipc::access_permission permission) +{ + switch (permission) + { + case nano::ipc::access_permission::invalid: + return "invalid"; + case nano::ipc::access_permission::unrestricted: + return "unrestricted"; + case nano::ipc::access_permission::api_service_register: + return "api_service_register"; + case nano::ipc::access_permission::api_service_stop: + return "api_service_stop"; + case nano::ipc::access_permission::api_account_weight: + return "api_account_weight"; + case nano::ipc::access_permission::api_topic_confirmation: + return "api_topic_confirmation"; + case nano::ipc::access_permission::api_topic_service_stop: + return "api_topic_service_stop"; + case nano::ipc::access_permission::account_query: + return "account_query"; + case nano::ipc::access_permission::epoch_upgrade: + return "epoch_upgrade"; + case nano::ipc::access_permission::service: + return "service"; + case nano::ipc::access_permission::wallet: + return "wallet"; + case nano::ipc::access_permission::wallet_read: + return "wallet_read"; + case nano::ipc::access_permission::wallet_write: + return "wallet_write"; + case nano::ipc::access_permission::wallet_seed_change: + return "wallet_seed_change"; + } + + return "invalid"; +} + +/** Convert string to permission */ +nano::ipc::access_permission from_string (std::string permission) +{ + if (permission == "unrestricted") + return nano::ipc::access_permission::unrestricted; + if (permission == "api_account_weight") + return nano::ipc::access_permission::api_account_weight; + if (permission == "api_service_register") + return nano::ipc::access_permission::api_service_register; + if (permission == "api_service_stop") + return nano::ipc::access_permission::api_service_stop; + if (permission == "api_topic_service_stop") + return nano::ipc::access_permission::api_topic_service_stop; + if (permission == "api_topic_confirmation") + return nano::ipc::access_permission::api_topic_confirmation; + if (permission == "account_query") + return nano::ipc::access_permission::account_query; + if (permission == "epoch_upgrade") + return nano::ipc::access_permission::epoch_upgrade; + if (permission == "service") + return nano::ipc::access_permission::service; + if (permission == "wallet") + return nano::ipc::access_permission::wallet; + if (permission == "wallet_read") + return nano::ipc::access_permission::wallet_read; + if (permission == "wallet_write") + return nano::ipc::access_permission::wallet_write; + if (permission == "wallet_seed_change") + return nano::ipc::access_permission::wallet_seed_change; + + return nano::ipc::access_permission::invalid; +} +} + +void nano::ipc::access::set_effective_permissions (nano::ipc::access_subject & subject_a, std::shared_ptr const & config_subject_a) +{ + std::string allow_l (config_subject_a->get_as ("allow").value_or ("")); + std::vector allow_strings_l; + boost::split (allow_strings_l, allow_l, boost::is_any_of (",")); + for (auto const & permission : allow_strings_l) + { + if (!permission.empty ()) + { + auto permission_enum = from_string (boost::trim_copy (permission)); + if (permission_enum != nano::ipc::access_permission::invalid) + { + subject_a.permissions.insert (permission_enum); + } + } + } + + std::string deny_l (config_subject_a->get_as ("deny").value_or ("")); + std::vector deny_strings_l; + boost::split (deny_strings_l, deny_l, boost::is_any_of (",")); + for (auto const & permission : deny_strings_l) + { + if (!permission.empty ()) + { + auto permission_enum = from_string (boost::trim_copy (permission)); + if (permission_enum != nano::ipc::access_permission::invalid) + { + subject_a.permissions.erase (permission_enum); + } + } + } +} + +void nano::ipc::access::clear () +{ + users.clear (); + roles.clear (); + + // Create default user. The node operator can add additional roles + // and permissions to the default user by adding a toml [[user]] entry + // without an id (or set it to the empty string). + // The default permissions can be overriden by marking the default user + // as bare, and then set specific permissions. + default_user.clear (); + default_user.id = ""; + + // The default set of permissions. A new insert should be made as new safe + // api's or resource permissions are made. + default_user.permissions.insert (nano::ipc::access_permission::api_account_weight); +} + +nano::error nano::ipc::access::deserialize_toml (nano::tomlconfig & toml) +{ + nano::unique_lock lock (mutex); + clear (); + + nano::error error; + if (toml.has_key ("role")) + { + auto get_role = [this](std::shared_ptr const & role_a) { + nano::ipc::access_role role; + std::string id_l (role_a->get_as ("id").value_or ("")); + role.id = id_l; + set_effective_permissions (role, role_a); + return role; + }; + + auto role_l = toml.get_tree ()->get ("role"); + if (role_l->is_table ()) + { + auto role = get_role (role_l->as_table ()); + if (role_l->as_table ()->contains ("deny")) + { + error.set ("Only users can have deny entries"); + } + else + { + roles.emplace (role.id, role); + } + } + else if (role_l->is_table_array ()) + { + for (auto & table : *role_l->as_table_array ()) + { + if (table->contains ("deny")) + { + error.set ("Only users can have deny entries"); + } + + auto role = get_role (table); + roles.emplace (role.id, role); + } + } + } + + if (!error && toml.has_key ("user")) + { + auto get_user = [this, &error](std::shared_ptr const & user_a) { + nano::ipc::access_user user; + user.id = user_a->get_as ("id").value_or (""); + // Check bare flag. The tomlconfig parser stringifies values, so we must retrieve as string. + bool is_bare = user_a->get_as ("bare").value_or ("false") == "true"; + + // Adopt all permissions from the roles. This must be done before setting user permissions, since + // the user config may add deny-entries. + std::string roles_l (user_a->get_as ("roles").value_or ("")); + std::vector role_strings_l; + boost::split (role_strings_l, roles_l, boost::is_any_of (",")); + for (auto const & role : role_strings_l) + { + auto role_id (boost::trim_copy (role)); + if (!role_id.empty ()) + { + auto match = roles.find (role_id); + if (match != roles.end ()) + { + user.permissions.insert (match->second.permissions.begin (), match->second.permissions.end ()); + } + else + { + error.set ("Unknown role: " + role_id); + } + } + } + + // A user with the bare flag does not inherit default permissions + if (!is_bare) + { + user.permissions.insert (default_user.permissions.begin (), default_user.permissions.end ()); + } + + set_effective_permissions (user, user_a); + + return user; + }; + + auto user_l = toml.get_tree ()->get ("user"); + if (user_l->is_table ()) + { + auto user = get_user (user_l->as_table ()); + users.emplace (user.id, user); + } + else if (user_l->is_table_array ()) + { + for (auto & table : *user_l->as_table_array ()) + { + auto user = get_user (table); + if (user.id.empty () && users.size () > 0) + { + // This is a requirement because other users inherit permissions from the default user + error.set ("Changes to the default user must appear before other users in the access config file"); + break; + } + users.emplace (user.id, user); + } + } + } + + // Add default user if it wasn't present in the config file + if (users.find ("") == users.end ()) + { + users.emplace (default_user.id, default_user); + } + + return error; +} + +bool nano::ipc::access::has_access (std::string const & credentials_a, nano::ipc::access_permission permssion_a) const +{ + nano::unique_lock lock (mutex); + bool permitted = false; + auto user = users.find (credentials_a); + if (user != users.end ()) + { + permitted = user->second.permissions.find (permssion_a) != user->second.permissions.end (); + if (!permitted) + { + permitted = user->second.permissions.find (nano::ipc::access_permission::unrestricted) != user->second.permissions.end (); + } + } + return permitted; +} + +bool nano::ipc::access::has_access_to_all (std::string const & credentials_a, std::initializer_list permissions_a) const +{ + nano::unique_lock lock (mutex); + bool permitted = false; + auto user = users.find (credentials_a); + if (user != users.end ()) + { + for (auto permission : permissions_a) + { + permitted = user->second.permissions.find (permission) != user->second.permissions.end (); + if (!permitted) + { + break; + } + } + } + return permitted; +} + +bool nano::ipc::access::has_access_to_oneof (std::string const & credentials_a, std::initializer_list permissions_a) const +{ + nano::unique_lock lock (mutex); + bool permitted = false; + auto user = users.find (credentials_a); + if (user != users.end ()) + { + for (auto permission : permissions_a) + { + permitted = user->second.permissions.find (permission) != user->second.permissions.end (); + if (permitted) + { + break; + } + } + if (!permitted) + { + permitted = user->second.permissions.find (nano::ipc::access_permission::unrestricted) != user->second.permissions.end (); + } + } + return permitted; +} + +void nano::ipc::access_subject::clear () +{ + permissions.clear (); +} + +void nano::ipc::access_user::clear () +{ + access_subject::clear (); + roles.clear (); +} + +namespace nano +{ +namespace ipc +{ + nano::error read_access_config_toml (boost::filesystem::path const & data_path_a, nano::ipc::access & config_a) + { + nano::error error; + auto toml_config_path = nano::get_access_toml_config_path (data_path_a); + + nano::tomlconfig toml; + if (boost::filesystem::exists (toml_config_path)) + { + error = toml.read (toml_config_path); + } + else + { + std::stringstream config_overrides_stream; + config_overrides_stream << std::endl; + toml.read (config_overrides_stream); + } + + if (!error) + { + error = config_a.deserialize_toml (toml); + } + + return error; + } +} +} diff --git a/nano/node/ipc/ipc_access_config.hpp b/nano/node/ipc/ipc_access_config.hpp new file mode 100644 index 00000000..076c4d1d --- /dev/null +++ b/nano/node/ipc/ipc_access_config.hpp @@ -0,0 +1,133 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include +#include + +namespace boost +{ +namespace filesystem +{ + class path; +} +} +namespace cpptoml +{ +class table; +} + +namespace nano +{ +class tomlconfig; +namespace ipc +{ + struct enum_hash + { + template + constexpr typename std::enable_if::value, std::size_t>::type + operator() (T s) const noexcept + { + return static_cast (s); + } + }; + + /** + * Permissions come in roughly two forms: api permissions (one for every api we expose) and + * higher level resource permissions. We define a permission per api because a common use case is to + * allow a specific set of RPCs. The higher level resource permissions makes it easier to + * grant access to groups of operations or resources. An API implementation will typically check + * against the corresponding api permission (such as api_account_weight), but may also allow + * resource permissions (such as account_query). + */ + enum class access_permission + { + invalid, + /** Unrestricted access to the node, suitable for debugging and development */ + unrestricted, + api_account_weight, + api_service_register, + api_service_stop, + api_topic_service_stop, + api_topic_confirmation, + /** Query account information */ + account_query, + /** Epoch upgrade */ + epoch_upgrade, + /** All service operations */ + service, + /** All wallet operations */ + wallet, + /** Non-mutable wallet operations */ + wallet_read, + /** Mutable wallet operations */ + wallet_write, + /** Seed change */ + wallet_seed_change + }; + + /** A subject is a user or role with a set of permissions */ + class access_subject + { + public: + std::unordered_set permissions; + virtual ~access_subject () = default; + virtual void clear (); + }; + + /** Permissions can be organized into roles */ + class access_role final : public access_subject + { + public: + std::string id; + }; + + /** A user with credentials and a set of permissions (either directly or through roles) */ + class access_user final : public access_subject + { + public: + /* User credentials, serving as the id */ + std::string id; + std::vector roles; + void clear () override; + }; + + /** + * Constructs a user/role/permission domain model from config-access.toml, and + * allows permissions for a user to be checked. + * @note This class is thread safe + */ + class access final + { + public: + bool has_access (std::string const & credentials_a, nano::ipc::access_permission permission_a) const; + bool has_access_to_all (std::string const & credentials_a, std::initializer_list permissions_a) const; + bool has_access_to_oneof (std::string const & credentials_a, std::initializer_list permissions_a) const; + nano::error deserialize_toml (nano::tomlconfig &); + + private: + /** Process allow and deny entries for the given subject */ + void set_effective_permissions (nano::ipc::access_subject & subject_a, std::shared_ptr const & config_subject_a); + + /** Clear current users, roles and default permissions */ + void clear (); + + std::unordered_map users; + std::unordered_map roles; + + /** + * Default user with a basic set of permissions. Additional users will derive the permissions + * from the default user (unless "bare" is true in the access config file) + */ + access_user default_user; + /** The config can be externally reloaded and concurrently accessed */ + mutable std::mutex mutex; + }; + + nano::error read_access_config_toml (boost::filesystem::path const & data_path_a, nano::ipc::access & config_a); +} +} diff --git a/nano/node/ipc/ipc_broker.cpp b/nano/node/ipc/ipc_broker.cpp new file mode 100644 index 00000000..46ccdf0b --- /dev/null +++ b/nano/node/ipc/ipc_broker.cpp @@ -0,0 +1,259 @@ +#include +#include +#include +#include +#include +#include + +nano::ipc::broker::broker (nano::node & node_a) : +node (node_a) +{ +} + +std::shared_ptr nano::ipc::subscriber::get_parser (nano::ipc::ipc_config const & ipc_config_a) +{ + if (!parser) + { + parser = nano::ipc::flatbuffers_handler::make_flatbuffers_parser (ipc_config_a); + } + return parser; +} + +void nano::ipc::broker::start () +{ + node.observers.blocks.add ([this](nano::election_status const & status_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a) { + assert (status_a.type != nano::election_status_type::ongoing); + + try + { + // The subscriber(s) may be gone after the count check, but the only consequence + // is that broadcast is called only to not find any live sessions. + if (confirmation_subscriber_count () > 0) + { + auto confirmation (std::make_shared ()); + + confirmation->account = account_a.to_account (); + confirmation->amount = amount_a.to_string_dec (); + switch (status_a.type) + { + case nano::election_status_type::active_confirmed_quorum: + confirmation->confirmation_type = nanoapi::TopicConfirmationType::TopicConfirmationType_active_quorum; + break; + case nano::election_status_type::active_confirmation_height: + confirmation->confirmation_type = nanoapi::TopicConfirmationType::TopicConfirmationType_active_confirmation_height; + break; + case nano::election_status_type::inactive_confirmation_height: + confirmation->confirmation_type = nanoapi::TopicConfirmationType::TopicConfirmationType_inactive; + break; + default: + assert (false); + break; + }; + confirmation->confirmation_type = nanoapi::TopicConfirmationType::TopicConfirmationType_active_quorum; + confirmation->block = nano::ipc::flatbuffers_builder::block_to_union (*status_a.winner, amount_a, is_state_send_a); + confirmation->election_info = std::make_unique (); + confirmation->election_info->duration = status_a.election_duration.count (); + confirmation->election_info->time = status_a.election_end.count (); + confirmation->election_info->tally = status_a.tally.to_string_dec (); + confirmation->election_info->block_count = status_a.block_count; + confirmation->election_info->voter_count = status_a.voter_count; + confirmation->election_info->request_count = status_a.confirmation_request_count; + + broadcast (confirmation); + } + } + catch (nano::error const & err) + { + this->node.logger.always_log ("IPC: could not broadcast message: ", err.get_message ()); + } + }); +} + +template +void subscribe_or_unsubscribe (nano::logger_mt & logger, COLL & subscriber_collection, std::weak_ptr const & subscriber_a, TOPIC_TYPE topic_a) +{ + // Evict subscribers from dead sessions. Also remove current subscriber if unsubscribing. + subscriber_collection.erase (std::remove_if (subscriber_collection.begin (), subscriber_collection.end (), + [& logger = logger, topic_a, subscriber_a](auto & sub) { + bool remove = false; + auto subscriber_l = sub.subscriber.lock (); + if (subscriber_l) + { + if (auto calling_subscriber_l = subscriber_a.lock ()) + { + remove = topic_a->unsubscribe && subscriber_l->get_id () == calling_subscriber_l->get_id (); + if (remove) + { + logger.always_log ("IPC: unsubscription from subscriber #", calling_subscriber_l->get_id ()); + } + } + } + else + { + remove = true; + } + return remove; + }), + subscriber_collection.end ()); + + if (!topic_a->unsubscribe) + { + subscriber_collection.emplace_back (subscriber_a, topic_a); + } +} + +void nano::ipc::broker::subscribe (std::weak_ptr const & subscriber_a, std::shared_ptr const & confirmation_a) +{ + auto subscribers = confirmation_subscribers.lock (); + subscribe_or_unsubscribe (node.logger, subscribers.get (), subscriber_a, confirmation_a); +} + +void nano::ipc::broker::broadcast (std::shared_ptr const & confirmation_a) +{ + using Filter = nanoapi::TopicConfirmationTypeFilter; + decltype (confirmation_a->election_info) election_info; + nanoapi::BlockUnion block; + auto itr (confirmation_subscribers->begin ()); + while (itr != confirmation_subscribers->end ()) + { + if (auto subscriber_l = itr->subscriber.lock ()) + { + auto should_filter = [this, &itr, confirmation_a]() { + assert (itr->topic->options != nullptr); + auto conf_filter (itr->topic->options->confirmation_type_filter); + + bool should_filter_conf_type_l (true); + bool all_filter = conf_filter == Filter::TopicConfirmationTypeFilter_all; + bool inactive_filter = conf_filter == Filter::TopicConfirmationTypeFilter_inactive; + bool active_filter = conf_filter == Filter::TopicConfirmationTypeFilter_active || conf_filter == Filter::TopicConfirmationTypeFilter_active_quorum || conf_filter == Filter::TopicConfirmationTypeFilter_active_confirmation_height; + + if ((confirmation_a->confirmation_type == nanoapi::TopicConfirmationType::TopicConfirmationType_active_quorum || confirmation_a->confirmation_type == nanoapi::TopicConfirmationType::TopicConfirmationType_active_confirmation_height) && (all_filter || active_filter)) + { + should_filter_conf_type_l = false; + } + else if (confirmation_a->confirmation_type == nanoapi::TopicConfirmationType::TopicConfirmationType_inactive && (all_filter || inactive_filter)) + { + should_filter_conf_type_l = false; + } + + bool should_filter_account_l (itr->topic->options->all_local_accounts || !itr->topic->options->accounts.empty ()); + auto state (confirmation_a->block.AsBlockState ()); + if (state && !should_filter_conf_type_l) + { + if (itr->topic->options->all_local_accounts) + { + auto transaction_l (this->node.wallets.tx_begin_read ()); + nano::account source_l (0), destination_l (0); + auto decode_source_ok_l (!source_l.decode_account (state->account)); + auto decode_destination_ok_l (!destination_l.decode_account (state->link_as_account)); + (void)decode_source_ok_l; + (void)decode_destination_ok_l; + assert (decode_source_ok_l && decode_destination_ok_l); + if (this->node.wallets.exists (transaction_l, source_l) || this->node.wallets.exists (transaction_l, destination_l)) + { + should_filter_account_l = false; + } + } + + if (std::find (itr->topic->options->accounts.begin (), itr->topic->options->accounts.end (), state->account) != itr->topic->options->accounts.end () || std::find (itr->topic->options->accounts.begin (), itr->topic->options->accounts.end (), state->link_as_account) != itr->topic->options->accounts.end ()) + { + should_filter_account_l = false; + } + } + + return should_filter_conf_type_l || should_filter_account_l; + }; + // Apply any filters + auto & options (itr->topic->options); + if (options) + { + if (!options->include_election_info) + { + election_info = std::move (confirmation_a->election_info); + confirmation_a->election_info = nullptr; + } + if (!options->include_block) + { + block = confirmation_a->block; + confirmation_a->block.Reset (); + } + } + if (!options || !should_filter ()) + { + auto fb (nano::ipc::flatbuffer_producer::make_buffer (*confirmation_a)); + + if (subscriber_l->get_active_encoding () == nano::ipc::payload_encoding::flatbuffers_json) + { + auto parser (subscriber_l->get_parser (node.config.ipc_config)); + + // Convert response to JSON + auto json (std::make_shared ()); + if (!flatbuffers::GenerateText (*parser, fb->GetBufferPointer (), json.get ())) + { + throw nano::error ("Couldn't serialize response to JSON"); + } + + subscriber_l->async_send_message (reinterpret_cast (json->data ()), json->size (), [json](const nano::error & err) {}); + } + else + { + subscriber_l->async_send_message (fb->GetBufferPointer (), fb->GetSize (), [fb](const nano::error & err) {}); + } + } + + // Restore full object, the next subscriber may request it + if (election_info) + { + confirmation_a->election_info = std::move (election_info); + } + if (block.type != nanoapi::Block::Block_NONE) + { + confirmation_a->block = block; + } + + ++itr; + } + else + { + itr = confirmation_subscribers->erase (itr); + } + } +} + +size_t nano::ipc::broker::confirmation_subscriber_count () const +{ + return confirmation_subscribers->size (); +} + +void nano::ipc::broker::service_register (std::string const & service_name_a, std::weak_ptr const & subscriber_a) +{ + if (auto subscriber_l = subscriber_a.lock ()) + { + subscriber_l->set_service_name (service_name_a); + } +} + +void nano::ipc::broker::service_stop (std::string const & service_name_a) +{ + auto subscribers = service_stop_subscribers.lock (); + for (auto & subcription : subscribers.get ()) + { + if (auto subscriber_l = subcription.subscriber.lock ()) + { + if (subscriber_l->get_service_name () == service_name_a) + { + nanoapi::EventServiceStopT event_stop; + auto fb (nano::ipc::flatbuffer_producer::make_buffer (event_stop)); + subscriber_l->async_send_message (fb->GetBufferPointer (), fb->GetSize (), [fb](const nano::error & err) {}); + + break; + } + } + } +} + +void nano::ipc::broker::subscribe (std::weak_ptr const & subscriber_a, std::shared_ptr const & service_stop_a) +{ + auto subscribers = service_stop_subscribers.lock (); + subscribe_or_unsubscribe (node.logger, subscribers.get (), subscriber_a, service_stop_a); +} diff --git a/nano/node/ipc/ipc_broker.hpp b/nano/node/ipc/ipc_broker.hpp new file mode 100644 index 00000000..84b1a3bd --- /dev/null +++ b/nano/node/ipc/ipc_broker.hpp @@ -0,0 +1,104 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include + +namespace flatbuffers +{ +class Parser; +} +namespace nano +{ +class node; +class error; +namespace ipc +{ + class ipc_config; + /** + * A subscriber represents a live session, and is weakly referenced nano::ipc::subscription whenever a subscription is made. + * This construction helps making the session implementation opaque to clients. + */ + class subscriber + { + public: + virtual ~subscriber () = default; + + /** + * Send message payload to the client. The implementation will prepend the bigendian length. + * @param data_a The caller must ensure the lifetime is extended until the completion handler is called, such as through a lambda capture. + * @param length_a Length of payload message in bytes + * @param broadcast_completion_handler_a Called once sending is completed + */ + virtual void async_send_message (uint8_t const * data_a, size_t length_a, std::function broadcast_completion_handler_a) = 0; + /** Returns the unique id of the associated session */ + virtual uint64_t get_id () const = 0; + /** Returns the service name associated with the session */ + virtual std::string get_service_name () const = 0; + /** Sets the service name associated with the session */ + virtual void set_service_name (std::string const & service_name_a) = 0; + /** Returns the session's active payload encoding */ + virtual nano::ipc::payload_encoding get_active_encoding () const = 0; + + /** Get flatbuffer parser instance for this subscriber; create it if necessary */ + std::shared_ptr get_parser (nano::ipc::ipc_config const & ipc_config_a); + + private: + std::shared_ptr parser; + }; + + /** + * Subscriptions are added to the broker whenever a topic message is sent from a client. + * The subscription is removed when the client unsubscribes, or lazily removed after the + * session is closed. + */ + template + class subscription final + { + public: + subscription (std::weak_ptr const & subscriber_a, std::shared_ptr const & topic_a) : + subscriber (subscriber_a), topic (topic_a) + { + } + + std::weak_ptr subscriber; + std::shared_ptr topic; + }; + + /** + * The broker manages subscribers and performs message broadcasting + * @note Add subscribe overloads for new topics + */ + class broker final + { + public: + broker (nano::node & node_a); + /** Starts the broker by setting up observers */ + void start (); + /** Subscribe to block confirmations */ + void subscribe (std::weak_ptr const & subscriber_a, std::shared_ptr const & confirmation_a); + /** Subscribe to EventServiceStop notifications for \p subscriber_a. The subscriber must first have called ServiceRegister. */ + void subscribe (std::weak_ptr const & subscriber_a, std::shared_ptr const & service_stop_a); + + /** Returns the number of confirmation subscribers */ + size_t confirmation_subscriber_count () const; + /** Associate the service name with the subscriber */ + void service_register (std::string const & service_name_a, std::weak_ptr const & subscriber_a); + /** Sends a notification to the session associated with the given service (if the session has subscribed to TopicServiceStop) */ + void service_stop (std::string const & service_name_a); + + private: + /** Broadcast block confirmations */ + void broadcast (std::shared_ptr const & confirmation_a); + + nano::node & node; + mutable nano::locked>> confirmation_subscribers; + mutable nano::locked>> service_stop_subscribers; + }; +} +} diff --git a/nano/node/ipcconfig.cpp b/nano/node/ipc/ipc_config.cpp similarity index 85% rename from nano/node/ipcconfig.cpp rename to nano/node/ipc/ipc_config.cpp index 4180e40b..9bfff2c2 100644 --- a/nano/node/ipcconfig.cpp +++ b/nano/node/ipc/ipc_config.cpp @@ -1,6 +1,6 @@ #include #include -#include +#include nano::error nano::ipc::ipc_config::serialize_toml (nano::tomlconfig & toml) const { @@ -25,6 +25,12 @@ nano::error nano::ipc::ipc_config::serialize_toml (nano::tomlconfig & toml) cons domain_l.put ("path", transport_domain.path, "Path to the local domain socket.\ntype:string"); domain_l.put ("io_timeout", transport_domain.io_timeout, "Timeout for requests.\ntype:seconds"); toml.put_child ("local", domain_l); + + nano::tomlconfig flatbuffers_l; + flatbuffers_l.put ("skip_unexpected_fields_in_json", flatbuffers.skip_unexpected_fields_in_json, "Allow client to send unknown fields in json messages. These will be ignored.\ntype:bool"); + flatbuffers_l.put ("verify_buffers", flatbuffers.verify_buffers, "Verify that the buffer is valid before parsing. This is recommended when receiving data from untrusted sources.\ntype:bool"); + toml.put_child ("flatbuffers", flatbuffers_l); + return toml.get_error (); } @@ -50,6 +56,13 @@ nano::error nano::ipc::ipc_config::deserialize_toml (nano::tomlconfig & toml) domain_l->get ("io_timeout", transport_domain.io_timeout); } + auto flatbuffers_l (toml.get_optional_child ("flatbuffers")); + if (flatbuffers_l) + { + flatbuffers_l->get ("skip_unexpected_fields_in_json", flatbuffers.skip_unexpected_fields_in_json); + flatbuffers_l->get ("verify_buffers", flatbuffers.verify_buffers); + } + return toml.get_error (); } diff --git a/nano/node/ipcconfig.hpp b/nano/node/ipc/ipc_config.hpp similarity index 84% rename from nano/node/ipcconfig.hpp rename to nano/node/ipc/ipc_config.hpp index 7abf8f3a..a09b3b05 100644 --- a/nano/node/ipcconfig.hpp +++ b/nano/node/ipc/ipc_config.hpp @@ -22,6 +22,16 @@ namespace ipc long io_threads{ -1 }; }; + /** + * Flatbuffers encoding config. See TOML serialization calls for details about each field. + */ + class ipc_config_flatbuffers final + { + public: + bool skip_unexpected_fields_in_json{ true }; + bool verify_buffers{ true }; + }; + /** Domain socket specific transport config */ class ipc_config_domain_socket : public ipc_config_transport { @@ -61,6 +71,7 @@ namespace ipc nano::error serialize_toml (nano::tomlconfig & toml) const; ipc_config_domain_socket transport_domain; ipc_config_tcp_socket transport_tcp; + ipc_config_flatbuffers flatbuffers; }; } } diff --git a/nano/node/ipc/ipc_server.cpp b/nano/node/ipc/ipc_server.cpp new file mode 100644 index 00000000..2dc1aece --- /dev/null +++ b/nano/node/ipc/ipc_server.cpp @@ -0,0 +1,659 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +#include + +using namespace boost::log; + +namespace +{ +/** + * A session manages an inbound connection over which messages are exchanged. + */ +template +class session final : public nano::ipc::socket_base, public std::enable_shared_from_this> +{ +public: + session (nano::ipc::ipc_server & server_a, boost::asio::io_context & io_ctx_a, nano::ipc::ipc_config_transport & config_transport_a) : + socket_base (io_ctx_a), + server (server_a), node (server_a.node), session_id (server_a.id_dispenser.fetch_add (1)), + io_ctx (io_ctx_a), strand (io_ctx_a.get_executor ()), socket (io_ctx_a), config_transport (config_transport_a) + { + if (node.config.logging.log_ipc ()) + { + node.logger.always_log ("IPC: created session with id: ", session_id.load ()); + } + } + + ~session () + { + close (); + } + + SOCKET_TYPE & get_socket () + { + return socket; + } + + std::shared_ptr get_subscriber () + { + class subscriber_impl final : public nano::ipc::subscriber, public std::enable_shared_from_this + { + public: + subscriber_impl (std::shared_ptr const & session_a) : + session_m (session_a) + { + } + virtual void async_send_message (uint8_t const * data_a, size_t length_a, std::function broadcast_completion_handler_a) override + { + if (auto session_l = session_m.lock ()) + { + auto big_endian_length = std::make_shared (boost::endian::native_to_big (static_cast (length_a))); + boost::array buffers = { + boost::asio::buffer (big_endian_length.get (), sizeof (std::uint32_t)), + boost::asio::buffer (data_a, length_a) + }; + + session_l->queued_write (buffers, [broadcast_completion_handler_a, big_endian_length](boost::system::error_code const & ec_a, size_t size_a) { + if (broadcast_completion_handler_a) + { + nano::error error_l (ec_a); + broadcast_completion_handler_a (error_l); + } + }); + } + } + + uint64_t get_id () const override + { + uint64_t id{ 0 }; + if (auto session_l = session_m.lock ()) + { + id = session_l->session_id; + } + return id; + } + + std::string get_service_name () const override + { + std::string name{ 0 }; + if (auto session_l = session_m.lock ()) + { + name = session_l->service_name; + } + return name; + } + + void set_service_name (std::string const & service_name_a) override + { + if (auto session_l = session_m.lock ()) + { + session_l->service_name = service_name_a; + } + } + + nano::ipc::payload_encoding get_active_encoding () const override + { + nano::ipc::payload_encoding encoding{ nano::ipc::payload_encoding::flatbuffers }; + if (auto session_l = session_m.lock ()) + { + encoding = session_l->active_encoding; + } + return encoding; + } + + private: + std::weak_ptr session_m; + }; + + static std::mutex subscriber_mutex; + nano::unique_lock lock (subscriber_mutex); + + if (!subscriber) + { + subscriber = std::make_shared (this->shared_from_this ()); + } + return subscriber; + } + + /** Write a fixed array of buffers through the queue. Once the last item is completed, the callback is invoked */ + template + void queued_write (boost::array & buffers, std::function callback_a) + { + auto this_l (this->shared_from_this ()); + boost::asio::post (strand, boost::asio::bind_executor (strand, [buffers, callback_a, this_l]() { + bool write_in_progress = !this_l->send_queue.empty (); + auto queue_size = this_l->send_queue.size (); + if (queue_size < this_l->queue_size_max) + { + for (size_t i = 0; i < N - 1; i++) + { + this_l->send_queue.emplace_back (queue_item{ buffers[i], nullptr }); + } + this_l->send_queue.emplace_back (queue_item{ buffers[N - 1], callback_a }); + } + if (!write_in_progress) + { + this_l->write_queued_messages (); + } + })); + } + + /** + * Write to underlying socket. Writes goes through a queue protected by the strand. Thus, this function + * can be called concurrently with other writes. + * @note This function explicitely doesn't use nano::shared_const_buffer, as buffers usually originate from Flatbuffers + * and copying into the shared_const_buffer vector would impose a significant overhead for large requests and responses. + */ + void queued_write (boost::asio::const_buffer const & buffer_a, std::function callback_a) + { + auto this_l (this->shared_from_this ()); + boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback_a, this_l]() { + bool write_in_progress = !this_l->send_queue.empty (); + auto queue_size = this_l->send_queue.size (); + if (queue_size < this_l->queue_size_max) + { + this_l->send_queue.emplace_back (queue_item{ buffer_a, callback_a }); + } + if (!write_in_progress) + { + this_l->write_queued_messages (); + } + })); + } + + void write_queued_messages () + { + std::weak_ptr this_w (this->shared_from_this ()); + auto msg (send_queue.front ()); + timer_start (std::chrono::seconds (config_transport.io_timeout)); + nano::unsafe_async_write (socket, msg.buffer, + boost::asio::bind_executor (strand, + [msg, this_w](boost::system::error_code ec, std::size_t size_a) { + if (auto this_l = this_w.lock ()) + { + this_l->timer_cancel (); + + if (msg.callback) + { + msg.callback (ec, size_a); + } + + this_l->send_queue.pop_front (); + if (!ec && !this_l->send_queue.empty ()) + { + this_l->write_queued_messages (); + } + } + })); + } + + /** + * Async read of exactly \p size_a bytes. The callback is invoked only when all the data is available and + * no error has occurred. On error, the error is logged, the read cycle stops and the session ends. Clients + * are expected to implement reconnect logic. + */ + void async_read_exactly (void * buff_a, size_t size_a, std::function const & callback_a) + { + async_read_exactly (buff_a, size_a, std::chrono::seconds (config_transport.io_timeout), callback_a); + } + + /** + * Async read of exactly \p size_a bytes and a specific \p timeout_a. + * @see async_read_exactly (void *, size_t, std::function) + */ + void async_read_exactly (void * buff_a, size_t size_a, std::chrono::seconds timeout_a, std::function const & callback_a) + { + timer_start (timeout_a); + auto this_l (this->shared_from_this ()); + boost::asio::async_read (socket, + boost::asio::buffer (buff_a, size_a), + boost::asio::transfer_exactly (size_a), + boost::asio::bind_executor (strand, + [this_l, callback_a](boost::system::error_code const & ec, size_t bytes_transferred_a) { + this_l->timer_cancel (); + if (ec == boost::asio::error::broken_pipe || ec == boost::asio::error::connection_aborted || ec == boost::asio::error::connection_reset || ec == boost::asio::error::connection_refused) + { + if (this_l->node.config.logging.log_ipc ()) + { + this_l->node.logger.always_log (boost::str (boost::format ("IPC: error reading %1% ") % ec.message ())); + } + } + else if (bytes_transferred_a > 0) + { + callback_a (); + } + })); + } + + /** Handler for payload_encoding::json_legacy */ + void handle_json_query (bool allow_unsafe) + { + session_timer.restart (); + auto request_id_l (std::to_string (server.id_dispenser.fetch_add (1))); + + // This is called when nano::rpc_handler#process_request is done. We convert to + // json and write the response to the ipc socket with a length prefix. + auto this_l (this->shared_from_this ()); + auto response_handler_l ([this_l, request_id_l](std::string const & body) { + auto big = boost::endian::native_to_big (static_cast (body.size ())); + auto buffer (std::make_shared> ()); + buffer->insert (buffer->end (), reinterpret_cast (&big), reinterpret_cast (&big) + sizeof (std::uint32_t)); + buffer->insert (buffer->end (), body.begin (), body.end ()); + if (this_l->node.config.logging.log_ipc ()) + { + this_l->node.logger.always_log (boost::str (boost::format ("IPC/RPC request %1% completed in: %2% %3%") % request_id_l % this_l->session_timer.stop ().count () % this_l->session_timer.unit ())); + } + + this_l->timer_start (std::chrono::seconds (this_l->config_transport.io_timeout)); + this_l->queued_write (boost::asio::buffer (buffer->data (), buffer->size ()), [this_l, buffer](boost::system::error_code const & error_a, size_t size_a) { + this_l->timer_cancel (); + if (!error_a) + { + this_l->read_next_request (); + } + else if (this_l->node.config.logging.log_ipc ()) + { + this_l->node.logger.always_log ("IPC: Write failed: ", error_a.message ()); + } + }); + + // Do not call any member variables here (like session_timer) as it's possible that the next request may already be underway. + }); + + node.stats.inc (nano::stat::type::ipc, nano::stat::detail::invocations); + auto body (std::string (reinterpret_cast (buffer.data ()), buffer.size ())); + + // Note that if the rpc action is async, the shared_ptr lifetime will be extended by the action handler + auto handler (std::make_shared (node, server.node_rpc_config, body, response_handler_l, [& server = server]() { + server.stop (); + server.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (3), [& io_ctx = server.node.alarm.io_ctx]() { + io_ctx.stop (); + }); + })); + // For unsafe actions to be allowed, the unsafe encoding must be used AND the transport config must allow it + handler->process_request (allow_unsafe && config_transport.allow_unsafe); + } + + /** Async request reader */ + void read_next_request () + { + auto this_l = this->shared_from_this (); + + // Await next request indefinitely + buffer.resize (sizeof (buffer_size)); + async_read_exactly (buffer.data (), buffer.size (), std::chrono::seconds::max (), [this_l]() { + auto encoding (this_l->buffer[nano::ipc::preamble_offset::encoding]); + this_l->active_encoding = static_cast (encoding); + if (this_l->buffer[nano::ipc::preamble_offset::lead] != 'N' || this_l->buffer[nano::ipc::preamble_offset::reserved_1] != 0 || this_l->buffer[nano::ipc::preamble_offset::reserved_2] != 0) + { + if (this_l->node.config.logging.log_ipc ()) + { + this_l->node.logger.always_log ("IPC: Invalid preamble"); + } + } + else if (encoding == static_cast (nano::ipc::payload_encoding::json_v1) || encoding == static_cast (nano::ipc::payload_encoding::json_v1_unsafe)) + { + auto allow_unsafe (encoding == static_cast (nano::ipc::payload_encoding::json_v1_unsafe)); + // Length of payload + this_l->async_read_exactly (&this_l->buffer_size, sizeof (this_l->buffer_size), [this_l, allow_unsafe]() { + boost::endian::big_to_native_inplace (this_l->buffer_size); + this_l->buffer.resize (this_l->buffer_size); + // Payload (ptree compliant JSON string) + this_l->async_read_exactly (this_l->buffer.data (), this_l->buffer_size, [this_l, allow_unsafe]() { + this_l->handle_json_query (allow_unsafe); + }); + }); + } + else if (encoding == static_cast (nano::ipc::payload_encoding::flatbuffers) || encoding == static_cast (nano::ipc::payload_encoding::flatbuffers_json)) + { + // Length of payload + this_l->async_read_exactly (&this_l->buffer_size, sizeof (this_l->buffer_size), [this_l, encoding]() { + boost::endian::big_to_native_inplace (this_l->buffer_size); + this_l->buffer.resize (this_l->buffer_size); + // Payload (flatbuffers or flatbuffers mappable json) + this_l->async_read_exactly (this_l->buffer.data (), this_l->buffer_size, [this_l, encoding]() { + this_l->session_timer.restart (); + + // Lazily create one Flatbuffers handler instance per session + if (!this_l->flatbuffers_handler) + { + this_l->flatbuffers_handler = std::make_shared (this_l->node, this_l->server, this_l->get_subscriber (), this_l->node.config.ipc_config); + } + + if (encoding == static_cast (nano::ipc::payload_encoding::flatbuffers_json)) + { + this_l->flatbuffers_handler->process_json (this_l->buffer.data (), this_l->buffer_size, [this_l](std::shared_ptr body) { + if (this_l->node.config.logging.log_ipc ()) + { + this_l->node.logger.always_log (boost::str (boost::format ("IPC/Flatbuffer request completed in: %1% %2%") % this_l->session_timer.stop ().count () % this_l->session_timer.unit ())); + } + + auto big_endian_length = std::make_shared (boost::endian::native_to_big (static_cast (body->size ()))); + boost::array buffers = { + boost::asio::buffer (big_endian_length.get (), sizeof (std::uint32_t)), + boost::asio::buffer (body->data (), body->size ()) + }; + + this_l->queued_write (buffers, [this_l, body, big_endian_length](boost::system::error_code const & error_a, size_t size_a) { + if (!error_a) + { + this_l->read_next_request (); + } + else if (this_l->node.config.logging.log_ipc ()) + { + this_l->node.logger.always_log ("IPC: Write failed: ", error_a.message ()); + } + }); + }); + } + else + { + this_l->flatbuffers_handler->process (this_l->buffer.data (), this_l->buffer_size, [this_l](std::shared_ptr fbb) { + if (this_l->node.config.logging.log_ipc ()) + { + this_l->node.logger.always_log (boost::str (boost::format ("IPC/Flatbuffer request completed in: %1% %2%") % this_l->session_timer.stop ().count () % this_l->session_timer.unit ())); + } + + auto big_endian_length = std::make_shared (boost::endian::native_to_big (static_cast (fbb->GetSize ()))); + boost::array buffers = { + boost::asio::buffer (big_endian_length.get (), sizeof (std::uint32_t)), + boost::asio::buffer (fbb->GetBufferPointer (), fbb->GetSize ()) + }; + + this_l->queued_write (buffers, [this_l, fbb, big_endian_length](boost::system::error_code const & error_a, size_t size_a) { + if (!error_a) + { + this_l->read_next_request (); + } + else if (this_l->node.config.logging.log_ipc ()) + { + this_l->node.logger.always_log ("IPC: Write failed: ", error_a.message ()); + } + }); + }); + } + }); + }); + } + else if (this_l->node.config.logging.log_ipc ()) + { + this_l->node.logger.always_log ("IPC: Unsupported payload encoding"); + } + }); + } + + /** Shut down and close socket. This is also called if the timer expires. */ + void close () + { + boost::system::error_code ec_ignored; + socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec_ignored); + socket.close (ec_ignored); + } + +private: + /** Holds the buffer and callback for queued writes */ + class queue_item + { + public: + boost::asio::const_buffer buffer; + std::function callback; + }; + size_t const queue_size_max = 64 * 1024; + + nano::ipc::ipc_server & server; + nano::node & node; + + /** Unique session id */ + std::atomic session_id; + + /** Service name associated with this session. This is set through the ServiceRegister API */ + nano::locked service_name; + + /** + * The payload encoding currently in use by this session. This is set as requests are + * received and usually never changes (although a client technically can) + */ + std::atomic active_encoding; + + /** Timer for measuring the duration of ipc calls */ + nano::timer session_timer; + + /** + * IO context from node, or per-transport, depending on configuration. + * Certain transports may scale better if they use a separate context. + */ + boost::asio::io_context & io_ctx; + + /** IO strand for synchronizing */ + boost::asio::strand strand; + + /** The send queue is protected by always being accessed through the strand */ + std::deque send_queue; + + /** A socket of the given asio type */ + SOCKET_TYPE socket; + + /** Buffer sizes are read into this */ + uint32_t buffer_size{ 0 }; + + /** Buffer used to store data received from the client */ + std::vector buffer; + + /** Transport configuration */ + nano::ipc::ipc_config_transport & config_transport; + + /** Handler for Flatbuffers requests. This is created lazily on the first request. */ + std::shared_ptr flatbuffers_handler; + + /** Session subscriber */ + std::shared_ptr subscriber; +}; + +/** Domain and TCP socket transport */ +template +class socket_transport : public nano::ipc::transport +{ +public: + socket_transport (nano::ipc::ipc_server & server_a, ENDPOINT_TYPE endpoint_a, nano::ipc::ipc_config_transport & config_transport_a, int concurrency_a) : + server (server_a), config_transport (config_transport_a) + { + // Using a per-transport event dispatcher? + if (concurrency_a > 0) + { + io_ctx = std::make_unique (); + } + + boost::asio::socket_base::reuse_address option (true); + boost::asio::socket_base::keep_alive option_keepalive (true); + acceptor = std::make_unique (context (), endpoint_a); + acceptor->set_option (option); + acceptor->set_option (option_keepalive); + accept (); + + // Start serving IO requests. If concurrency_a is < 1, the node's thread pool/io_context is used instead. + // A separate io_context for domain sockets may facilitate better performance on some systems. + if (concurrency_a > 0) + { + runner = std::make_unique (*io_ctx, static_cast (concurrency_a)); + } + } + + boost::asio::io_context & context () const + { + return io_ctx ? *io_ctx : server.node.io_ctx; + } + + void accept () + { + // Prepare the next session + auto new_session (std::make_shared> (server, context (), config_transport)); + + acceptor->async_accept (new_session->get_socket (), [this, new_session](boost::system::error_code const & ec) { + if (!ec) + { + new_session->read_next_request (); + } + else + { + server.node.logger.always_log ("IPC: acceptor error: ", ec.message ()); + } + + if (ec != boost::asio::error::operation_aborted && acceptor->is_open ()) + { + this->accept (); + } + else + { + server.node.logger.always_log ("IPC: shutting down"); + } + }); + } + + void stop () + { + acceptor->close (); + if (io_ctx) + { + io_ctx->stop (); + } + + if (runner) + { + runner->join (); + } + } + +private: + nano::ipc::ipc_server & server; + nano::ipc::ipc_config_transport & config_transport; + std::unique_ptr runner; + std::unique_ptr io_ctx; + std::unique_ptr acceptor; +}; +} + +/** + * Awaits SIGHUP via signal_set instead of std::signal, as this allows the handler to escape the + * Posix signal handler restrictions + */ +void await_hup_signal (std::shared_ptr const & signals, nano::ipc::ipc_server & server_a) +{ + signals->async_wait ([signals, &server_a](const boost::system::error_code & ec, int signal_number) { + if (ec != boost::asio::error::operation_aborted) + { + std::cout << "Reloading access configuration..." << std::endl; + auto error (server_a.reload_access_config ()); + if (!error) + { + std::cout << "Reloaded access configuration successfully" << std::endl; + } + await_hup_signal (signals, server_a); + } + }); +} + +nano::ipc::ipc_server::ipc_server (nano::node & node_a, nano::node_rpc_config const & node_rpc_config_a) : +node (node_a), +node_rpc_config (node_rpc_config_a), +broker (node_a) +{ + try + { + nano::error access_config_error (reload_access_config ()); + if (access_config_error) + { + std::exit (1); + } +#ifndef _WIN32 + // Hook up config reloading through the HUP signal + auto signals (std::make_shared (node.io_ctx, SIGHUP)); + await_hup_signal (signals, *this); +#endif + if (node_a.config.ipc_config.transport_domain.enabled) + { +#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS) + auto threads = node_a.config.ipc_config.transport_domain.io_threads; + file_remover = std::make_unique (node_a.config.ipc_config.transport_domain.path); + boost::asio::local::stream_protocol::endpoint ep{ node_a.config.ipc_config.transport_domain.path }; + transports.push_back (std::make_shared> (*this, ep, node_a.config.ipc_config.transport_domain, threads)); +#else + node.logger.always_log ("IPC: Domain sockets are not supported on this platform"); +#endif + } + + if (node_a.config.ipc_config.transport_tcp.enabled) + { + auto threads = node_a.config.ipc_config.transport_tcp.io_threads; + transports.push_back (std::make_shared> (*this, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v6 (), node_a.config.ipc_config.transport_tcp.port), node_a.config.ipc_config.transport_tcp, threads)); + } + + node.logger.always_log ("IPC: server started"); + + if (!transports.empty ()) + { + broker.start (); + } + } + catch (std::runtime_error const & ex) + { + node.logger.always_log ("IPC: ", ex.what ()); + } +} + +nano::ipc::ipc_server::~ipc_server () +{ + node.logger.always_log ("IPC: server stopped"); +} + +void nano::ipc::ipc_server::stop () +{ + for (auto & transport : transports) + { + transport->stop (); + } +} + +nano::ipc::broker & nano::ipc::ipc_server::get_broker () +{ + return broker; +} + +nano::ipc::access & nano::ipc::ipc_server::get_access () +{ + return access; +} + +nano::error nano::ipc::ipc_server::reload_access_config () +{ + nano::error access_config_error (nano::ipc::read_access_config_toml (node.application_path, access)); + if (access_config_error) + { + auto error (boost::str (boost::format ("IPC: invalid access configuration file: %1%") % access_config_error.get_message ())); + std::cerr << error << std::endl; + node.logger.always_log (error); + } + return access_config_error; +} diff --git a/nano/node/ipc/ipc_server.hpp b/nano/node/ipc/ipc_server.hpp new file mode 100644 index 00000000..6815f907 --- /dev/null +++ b/nano/node/ipc/ipc_server.hpp @@ -0,0 +1,49 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace flatbuffers +{ +class Parser; +} +namespace nano +{ +class node; +class error; +namespace ipc +{ + class access; + /** The IPC server accepts connections on one or more configured transports */ + class ipc_server final + { + public: + ipc_server (nano::node & node, nano::node_rpc_config const & node_rpc_config); + ~ipc_server (); + void stop (); + + nano::node & node; + nano::node_rpc_config const & node_rpc_config; + + /** Unique counter/id shared across sessions */ + std::atomic id_dispenser{ 1 }; + nano::ipc::broker & get_broker (); + nano::ipc::access & get_access (); + nano::error reload_access_config (); + + private: + void setup_callbacks (); + nano::ipc::broker broker; + nano::ipc::access access; + std::unique_ptr file_remover; + std::vector> transports; + }; +} +} diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 2dc4ad37..400aed30 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -5104,6 +5104,23 @@ void nano::json_handler::work_peers_clear () response_errors (); } +void nano::inprocess_rpc_handler::process_request (std::string const &, std::string const & body_a, std::function response_a) +{ + // Note that if the rpc action is async, the shared_ptr lifetime will be extended by the action handler + auto handler (std::make_shared (node, node_rpc_config, body_a, response_a, [this]() { + this->stop_callback (); + this->stop (); + })); + handler->process_request (); +} + +void nano::inprocess_rpc_handler::process_request_v2 (rpc_handler_request_params const & params_a, std::string const & body_a, std::function)> response_a) +{ + std::string body_l = params_a.json_envelope (body_a); + auto handler (std::make_shared (node, ipc_server, nullptr, node.config.ipc_config)); + handler->process_json (reinterpret_cast (body_l.data ()), body_l.size (), response_a); +} + namespace { void construct_json (nano::container_info_component * component, boost::property_tree::ptree & parent) diff --git a/nano/node/json_handler.hpp b/nano/node/json_handler.hpp index 2a7b6ff7..a17e2a9d 100644 --- a/nano/node/json_handler.hpp +++ b/nano/node/json_handler.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -11,6 +12,10 @@ namespace nano { +namespace ipc +{ + class ipc_server; +} class node; class node_rpc_config; @@ -168,22 +173,16 @@ class inprocess_rpc_handler final : public nano::rpc_handler_interface { public: inprocess_rpc_handler ( - nano::node & node_a, nano::node_rpc_config const & node_rpc_config_a, std::function stop_callback_a = []() {}) : + nano::node & node_a, nano::ipc::ipc_server & ipc_server_a, nano::node_rpc_config const & node_rpc_config_a, std::function stop_callback_a = []() {}) : node (node_a), + ipc_server (ipc_server_a), stop_callback (stop_callback_a), node_rpc_config (node_rpc_config_a) { } - void process_request (std::string const &, std::string const & body_a, std::function response_a) override - { - // Note that if the rpc action is async, the shared_ptr lifetime will be extended by the action handler - auto handler (std::make_shared (node, node_rpc_config, body_a, response_a, [this]() { - this->stop_callback (); - this->stop (); - })); - handler->process_request (); - } + void process_request (std::string const &, std::string const & body_a, std::function response_a) override; + void process_request_v2 (rpc_handler_request_params const & params_a, std::string const & body_a, std::function)> response_a) override; void stop () override { @@ -200,6 +199,7 @@ public: private: nano::node & node; + nano::ipc::ipc_server & ipc_server; boost::optional rpc; std::function stop_callback; nano::node_rpc_config const & node_rpc_config; diff --git a/nano/node/json_payment_observer.cpp b/nano/node/json_payment_observer.cpp index 929aee89..26e51e2a 100644 --- a/nano/node/json_payment_observer.cpp +++ b/nano/node/json_payment_observer.cpp @@ -1,5 +1,5 @@ #include -#include +#include #include #include #include diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 4032ea4d..fdd49772 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -7,7 +7,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/nano/rpc/rpc_connection.cpp b/nano/rpc/rpc_connection.cpp index ba665d77..f02960af 100644 --- a/nano/rpc/rpc_connection.cpp +++ b/nano/rpc/rpc_connection.cpp @@ -6,6 +6,8 @@ #include #include +#include +#include #ifdef NANO_SECURE_RPC #include #endif @@ -99,11 +101,14 @@ template void nano::rpc_connection::parse_request (STREAM_TYPE & stream, std::shared_ptr> header_parser) { auto this_l (shared_from_this ()); + auto header_field_credentials_l (header_parser->get ()["nano-api-key"]); + auto header_corr_id_l (header_parser->get ()["nano-correlation-id"]); auto body_parser (std::make_shared> (std::move (*header_parser))); - boost::beast::http::async_read (stream, buffer, *body_parser, boost::asio::bind_executor (strand, [this_l, body_parser, &stream](boost::system::error_code const & ec, size_t bytes_transferred) { + auto path_l (body_parser->get ().target ().to_string ()); + boost::beast::http::async_read (stream, buffer, *body_parser, boost::asio::bind_executor (strand, [this_l, body_parser, header_field_credentials_l, header_corr_id_l, path_l, &stream](boost::system::error_code const & ec, size_t bytes_transferred) { if (!ec) { - this_l->io_ctx.post ([this_l, body_parser, &stream]() { + this_l->io_ctx.post ([this_l, body_parser, header_field_credentials_l, header_corr_id_l, path_l, &stream]() { auto & req (body_parser->get ()); auto start (std::chrono::steady_clock::now ()); auto version (req.version ()); @@ -121,13 +126,23 @@ void nano::rpc_connection::parse_request (STREAM_TYPE & stream, std::shared_ptr< ss << "RPC request " << request_id << " completed in: " << std::chrono::duration_cast (std::chrono::steady_clock::now () - start).count () << " microseconds"; this_l->logger.always_log (ss.str ().c_str ()); }); + + std::string api_path_l = "/api/v2"; + int rpc_version_l = boost::starts_with (path_l, api_path_l) ? 2 : 1; + auto method = req.method (); switch (method) { case boost::beast::http::verb::post: { auto handler (std::make_shared (this_l->rpc_config, req.body (), request_id, response_handler, this_l->rpc_handler_interface, this_l->logger)); - handler->process_request (); + nano::rpc_handler_request_params request_params; + request_params.rpc_version = rpc_version_l; + request_params.credentials = header_field_credentials_l.to_string (); + request_params.correlation_id = header_corr_id_l.to_string (); + request_params.path = boost::algorithm::erase_first_copy (path_l, api_path_l); + request_params.path = boost::algorithm::erase_first_copy (request_params.path, "/"); + handler->process_request (request_params); break; } case boost::beast::http::verb::options: diff --git a/nano/rpc/rpc_handler.cpp b/nano/rpc/rpc_handler.cpp index d92611be..c4c60c57 100644 --- a/nano/rpc/rpc_handler.cpp +++ b/nano/rpc/rpc_handler.cpp @@ -26,7 +26,7 @@ logger (logger) { } -void nano::rpc_handler::process_request () +void nano::rpc_handler::process_request (nano::rpc_handler_request_params const & request_params) { try { @@ -50,55 +50,70 @@ void nano::rpc_handler::process_request () } else { - boost::property_tree::ptree request; + if (request_params.rpc_version == 1) { + boost::property_tree::ptree request; + { + std::stringstream ss; + ss << body; + boost::property_tree::read_json (ss, request); + } + + auto action = request.get ("action"); + // Creating same string via stringstream as using it directly is generating a TSAN warning std::stringstream ss; - ss << body; - boost::property_tree::read_json (ss, request); + ss << request_id; + logger.always_log (ss.str (), " ", filter_request (request)); + + // Check if this is a RPC command which requires RPC enabled control + std::error_code rpc_control_disabled_ec = nano::error_rpc::rpc_control_disabled; + + bool error = false; + auto found = rpc_control_impl_set.find (action); + if (found != rpc_control_impl_set.cend () && !rpc_config.enable_control) + { + json_error_response (response, rpc_control_disabled_ec.message ()); + error = true; + } + else + { + // Special case with stats, type -> objects + if (action == "stats" && !rpc_config.enable_control) + { + if (request.get ("type") == "objects") + { + json_error_response (response, rpc_control_disabled_ec.message ()); + error = true; + } + } + else if (action == "process") + { + auto force = request.get_optional ("force").value_or (false); + auto watch_work = request.get_optional ("watch_work").value_or (true); + if ((force || watch_work) && !rpc_config.enable_control) + { + json_error_response (response, rpc_control_disabled_ec.message ()); + error = true; + } + } + } + + if (!error) + { + rpc_handler_interface.process_request (action, body, this->response); + } } - - auto action = request.get ("action"); - // Creating same string via stringstream as using it directly is generating a TSAN warning - std::stringstream ss; - ss << request_id; - logger.always_log (ss.str (), " ", filter_request (request)); - - // Check if this is a RPC command which requires RPC enabled control - std::error_code rpc_control_disabled_ec = nano::error_rpc::rpc_control_disabled; - - bool error = false; - auto found = rpc_control_impl_set.find (action); - if (found != rpc_control_impl_set.cend () && !rpc_config.enable_control) + else if (request_params.rpc_version == 2) { - json_error_response (response, rpc_control_disabled_ec.message ()); - error = true; + rpc_handler_interface.process_request_v2 (request_params, body, [response = response](std::shared_ptr body) { + std::string body_l = *body; + response (body_l); + }); } else { - // Special case with stats, type -> objects - if (action == "stats" && !rpc_config.enable_control) - { - if (request.get ("type") == "objects") - { - json_error_response (response, rpc_control_disabled_ec.message ()); - error = true; - } - } - else if (action == "process") - { - auto force = request.get_optional ("force").value_or (false); - auto watch_work = request.get_optional ("watch_work").value_or (true); - if ((force || watch_work) && !rpc_config.enable_control) - { - json_error_response (response, rpc_control_disabled_ec.message ()); - error = true; - } - } - } - - if (!error) - { - rpc_handler_interface.process_request (action, body, this->response); + assert (false); + json_error_response (response, "Invalid RPC version"); } } } diff --git a/nano/rpc/rpc_handler.hpp b/nano/rpc/rpc_handler.hpp index 47fc6f8e..bc75b6e1 100644 --- a/nano/rpc/rpc_handler.hpp +++ b/nano/rpc/rpc_handler.hpp @@ -10,12 +10,13 @@ namespace nano class rpc_config; class rpc_handler_interface; class logger_mt; +class rpc_handler_request_params; class rpc_handler : public std::enable_shared_from_this { public: rpc_handler (nano::rpc_config const & rpc_config, std::string const & body_a, std::string const & request_id_a, std::function const & response_a, nano::rpc_handler_interface & rpc_handler_interface_a, nano::logger_mt & logger); - void process_request (); + void process_request (nano::rpc_handler_request_params const & request_params); private: std::string body; diff --git a/nano/rpc/rpc_request_processor.cpp b/nano/rpc/rpc_request_processor.cpp index 9d79aa31..c92f59f7 100644 --- a/nano/rpc/rpc_request_processor.cpp +++ b/nano/rpc/rpc_request_processor.cpp @@ -146,7 +146,8 @@ void nano::rpc_request_processor::run () auto connection = *it; connection->is_available = false; // Make sure no one else can take it conditions_lk.unlock (); - auto req (nano::ipc::prepare_request (nano::ipc::payload_encoding::json_legacy, rpc_request->body)); + auto encoding (rpc_request->rpc_api_version == 1 ? nano::ipc::payload_encoding::json_v1 : nano::ipc::payload_encoding::flatbuffers_json); + auto req (nano::ipc::prepare_request (encoding, rpc_request->body)); auto res (std::make_shared> ()); // Have we tried to connect yet? diff --git a/nano/rpc/rpc_request_processor.hpp b/nano/rpc/rpc_request_processor.hpp index 38ced420..3981b246 100644 --- a/nano/rpc/rpc_request_processor.hpp +++ b/nano/rpc/rpc_request_processor.hpp @@ -27,6 +27,17 @@ struct rpc_request { } + rpc_request (int rpc_api_version_a, const std::string & body_a, std::function response_a) : + rpc_api_version (rpc_api_version_a), body (body_a), response (response_a) + { + } + + rpc_request (int rpc_api_version_a, const std::string & action_a, const std::string & body_a, std::function response_a) : + rpc_api_version (rpc_api_version_a), action (action_a), body (body_a), response (response_a) + { + } + + int rpc_api_version{ 1 }; std::string action; std::string body; std::function response; @@ -71,6 +82,15 @@ public: rpc_request_processor.add (std::make_shared (action_a, body_a, response_a)); } + void process_request_v2 (rpc_handler_request_params const & params_a, std::string const & body_a, std::function)> response_a) override + { + std::string body_l = params_a.json_envelope (body_a); + rpc_request_processor.add (std::make_shared (2 /* rpc version */, body_l, [response_a](std::string const & resp) { + auto resp_l (std::make_shared (resp)); + response_a (resp_l); + })); + } + void stop () override { rpc_request_processor.stop (); diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 5861a1ff..c01b1b96 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include #include #include @@ -7362,7 +7362,8 @@ TEST (rpc, in_process) nano::rpc_config rpc_config (nano::get_available_port (), true); rpc_config.rpc_process.ipc_port = node->config.ipc_config.transport_tcp.port; nano::node_rpc_config node_rpc_config; - nano::inprocess_rpc_handler inprocess_rpc_handler (*node, node_rpc_config); + nano::ipc::ipc_server ipc_server (*node, node_rpc_config); + nano::inprocess_rpc_handler inprocess_rpc_handler (*node, ipc_server, node_rpc_config); nano::rpc rpc (system.io_ctx, rpc_config, inprocess_rpc_handler); rpc.start (); boost::property_tree::ptree request;