diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 53241a3c9..77eb01e82 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -135,6 +135,8 @@ add_library( rep_tiers.cpp request_aggregator.hpp request_aggregator.cpp + rpc_callbacks.hpp + rpc_callbacks.cpp scheduler/bucket.cpp scheduler/bucket.hpp scheduler/component.hpp diff --git a/nano/node/fwd.hpp b/nano/node/fwd.hpp index 3a8b9a162..922724927 100644 --- a/nano/node/fwd.hpp +++ b/nano/node/fwd.hpp @@ -33,6 +33,7 @@ class recently_cemented_cache; class recently_confirmed_cache; class rep_crawler; class rep_tiers; +class rpc_callbacks; class telemetry; class unchecked_map; class stats; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index b6838515f..6c50715b4 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -193,6 +194,8 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy peer_history{ *peer_history_impl }, monitor_impl{ std::make_unique (config.monitor, *this) }, monitor{ *monitor_impl }, + rpc_callbacks_impl{ std::make_unique (*this) }, + rpc_callbacks{ *rpc_callbacks_impl }, startup_time{ std::chrono::steady_clock::now () }, node_seq{ seq } { @@ -250,66 +253,6 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy network.disconnect_observer = [this] () { observers.disconnect.notify (); }; - if (!config.callback_address.empty ()) - { - observers.blocks.add ([this] (nano::election_status const & status_a, std::vector const & votes_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a, bool is_state_epoch_a) { - auto block_a (status_a.winner); - if ((status_a.type == nano::election_status_type::active_confirmed_quorum || status_a.type == nano::election_status_type::active_confirmation_height)) - { - auto node_l (shared_from_this ()); - io_ctx.post ([node_l, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () { - boost::property_tree::ptree event; - event.add ("account", account_a.to_account ()); - event.add ("hash", block_a->hash ().to_string ()); - std::string block_text; - block_a->serialize_json (block_text); - event.add ("block", block_text); - event.add ("amount", amount_a.to_string_dec ()); - if (is_state_send_a) - { - event.add ("is_send", is_state_send_a); - event.add ("subtype", "send"); - } - // Subtype field - else if (block_a->type () == nano::block_type::state) - { - if (block_a->is_change ()) - { - event.add ("subtype", "change"); - } - else if (is_state_epoch_a) - { - debug_assert (amount_a == 0 && node_l->ledger.is_epoch_link (block_a->link_field ().value ())); - event.add ("subtype", "epoch"); - } - else - { - event.add ("subtype", "receive"); - } - } - std::stringstream ostream; - boost::property_tree::write_json (ostream, event); - ostream.flush (); - auto body (std::make_shared (ostream.str ())); - auto address (node_l->config.callback_address); - auto port (node_l->config.callback_port); - auto target (std::make_shared (node_l->config.callback_target)); - auto resolver (std::make_shared (node_l->io_ctx)); - resolver->async_resolve (boost::asio::ip::tcp::resolver::query (address, std::to_string (port)), [node_l, address, port, target, body, resolver] (boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator i_a) { - if (!ec) - { - node_l->do_rpc_callback (i_a, address, port, target, body, resolver); - } - else - { - node_l->logger.error (nano::log::type::rpc_callbacks, "Error resolving callback: {}:{} ({})", address, port, ec.message ()); - node_l->stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); - } - }); - }); - } - }); - } observers.channel_connected.add ([this] (std::shared_ptr const & channel) { network.send_keepalive_self (channel); @@ -472,68 +415,6 @@ nano::node::~node () stop (); } -// TODO: Move to a separate class -void nano::node::do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const & address, uint16_t port, std::shared_ptr const & target, std::shared_ptr const & body, std::shared_ptr const & resolver) -{ - if (i_a != boost::asio::ip::tcp::resolver::iterator{}) - { - auto node_l (shared_from_this ()); - auto sock (std::make_shared (node_l->io_ctx)); - sock->async_connect (i_a->endpoint (), [node_l, target, body, sock, address, port, i_a, resolver] (boost::system::error_code const & ec) mutable { - if (!ec) - { - auto req (std::make_shared> ()); - req->method (boost::beast::http::verb::post); - req->target (*target); - req->version (11); - req->insert (boost::beast::http::field::host, address); - req->insert (boost::beast::http::field::content_type, "application/json"); - req->body () = *body; - req->prepare_payload (); - boost::beast::http::async_write (*sock, *req, [node_l, sock, address, port, req, i_a, target, body, resolver] (boost::system::error_code const & ec, std::size_t bytes_transferred) mutable { - if (!ec) - { - auto sb (std::make_shared ()); - auto resp (std::make_shared> ()); - boost::beast::http::async_read (*sock, *sb, *resp, [node_l, sb, resp, sock, address, port, i_a, target, body, resolver] (boost::system::error_code const & ec, std::size_t bytes_transferred) mutable { - if (!ec) - { - if (boost::beast::http::to_status_class (resp->result ()) == boost::beast::http::status_class::successful) - { - node_l->stats.inc (nano::stat::type::http_callback, nano::stat::detail::initiate, nano::stat::dir::out); - } - else - { - node_l->logger.error (nano::log::type::rpc_callbacks, "Callback to {}:{} failed [status: {}]", address, port, nano::util::to_str (resp->result ())); - node_l->stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); - } - } - else - { - node_l->logger.error (nano::log::type::rpc_callbacks, "Unable to complete callback: {}:{} ({})", address, port, ec.message ()); - node_l->stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); - }; - }); - } - else - { - node_l->logger.error (nano::log::type::rpc_callbacks, "Unable to send callback: {}:{} ({})", address, port, ec.message ()); - node_l->stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); - } - }); - } - else - { - node_l->logger.error (nano::log::type::rpc_callbacks, "Unable to connect to callback address({}): {}:{} ({})", address, i_a->endpoint ().address ().to_string (), port, ec.message ()); - node_l->stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); - ++i_a; - - node_l->do_rpc_callback (i_a, address, port, target, body, resolver); - } - }); - } -} - bool nano::node::copy_with_compaction (std::filesystem::path const & destination) { return store.copy_db (destination); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index fc6581829..5105d6040 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -82,7 +82,6 @@ public: bool block_confirmed_or_being_confirmed (nano::secure::transaction const &, nano::block_hash const &); bool block_confirmed_or_being_confirmed (nano::block_hash const &); - void do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const &, uint16_t, std::shared_ptr const &, std::shared_ptr const &, std::shared_ptr const &); bool online () const; bool init_error () const; std::pair> get_bootstrap_weights () const; @@ -201,6 +200,8 @@ public: nano::peer_history & peer_history; std::unique_ptr monitor_impl; nano::monitor & monitor; + std::unique_ptr rpc_callbacks_impl; + nano::rpc_callbacks & rpc_callbacks; public: std::chrono::steady_clock::time_point const startup_time; diff --git a/nano/node/rpc_callbacks.cpp b/nano/node/rpc_callbacks.cpp new file mode 100644 index 000000000..1fa8ad0a2 --- /dev/null +++ b/nano/node/rpc_callbacks.cpp @@ -0,0 +1,139 @@ +#include +#include +#include +#include + +nano::rpc_callbacks::rpc_callbacks (nano::node & node_a) : + node{ node_a }, + config{ node_a.config }, + observers{ node_a.observers }, + ledger{ node_a.ledger }, + logger{ node_a.logger }, + stats{ node_a.stats } +{ + if (!config.callback_address.empty ()) + { + logger.info (nano::log::type::rpc_callbacks, "RPC callbacks enabled on {}:{}", config.callback_address, config.callback_port); + setup_callbacks (); + } +} + +void nano::rpc_callbacks::setup_callbacks () +{ + observers.blocks.add ([this] (nano::election_status const & status_a, std::vector const & votes_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a, bool is_state_epoch_a) { + auto block_a (status_a.winner); + if ((status_a.type == nano::election_status_type::active_confirmed_quorum || status_a.type == nano::election_status_type::active_confirmation_height)) + { + node.workers.post ([this, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () { + boost::property_tree::ptree event; + event.add ("account", account_a.to_account ()); + event.add ("hash", block_a->hash ().to_string ()); + std::string block_text; + block_a->serialize_json (block_text); + event.add ("block", block_text); + event.add ("amount", amount_a.to_string_dec ()); + if (is_state_send_a) + { + event.add ("is_send", is_state_send_a); + event.add ("subtype", "send"); + } + // Subtype field + else if (block_a->type () == nano::block_type::state) + { + if (block_a->is_change ()) + { + event.add ("subtype", "change"); + } + else if (is_state_epoch_a) + { + debug_assert (amount_a == 0 && ledger.is_epoch_link (block_a->link_field ().value ())); + event.add ("subtype", "epoch"); + } + else + { + event.add ("subtype", "receive"); + } + } + std::stringstream ostream; + boost::property_tree::write_json (ostream, event); + ostream.flush (); + auto body (std::make_shared (ostream.str ())); + auto address (config.callback_address); + auto port (config.callback_port); + auto target (std::make_shared (config.callback_target)); + auto resolver (std::make_shared (node.io_ctx)); + resolver->async_resolve (boost::asio::ip::tcp::resolver::query (address, std::to_string (port)), [this, address, port, target, body, resolver] (boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator i_a) { + if (!ec) + { + do_rpc_callback (i_a, address, port, target, body, resolver); + } + else + { + logger.error (nano::log::type::rpc_callbacks, "Error resolving callback: {}:{} ({})", address, port, ec.message ()); + stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); + } + }); + }); + } + }); +} + +void nano::rpc_callbacks::do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const & address, uint16_t port, std::shared_ptr const & target, std::shared_ptr const & body, std::shared_ptr const & resolver) +{ + if (i_a != boost::asio::ip::tcp::resolver::iterator{}) + { + auto sock (std::make_shared (node.io_ctx)); + sock->async_connect (i_a->endpoint (), [this, target, body, sock, address, port, i_a, resolver] (boost::system::error_code const & ec) mutable { + if (!ec) + { + auto req (std::make_shared> ()); + req->method (boost::beast::http::verb::post); + req->target (*target); + req->version (11); + req->insert (boost::beast::http::field::host, address); + req->insert (boost::beast::http::field::content_type, "application/json"); + req->body () = *body; + req->prepare_payload (); + boost::beast::http::async_write (*sock, *req, [this, sock, address, port, req, i_a, target, body, resolver] (boost::system::error_code const & ec, std::size_t bytes_transferred) mutable { + if (!ec) + { + auto sb (std::make_shared ()); + auto resp (std::make_shared> ()); + boost::beast::http::async_read (*sock, *sb, *resp, [this, sb, resp, sock, address, port, i_a, target, body, resolver] (boost::system::error_code const & ec, std::size_t bytes_transferred) mutable { + if (!ec) + { + if (boost::beast::http::to_status_class (resp->result ()) == boost::beast::http::status_class::successful) + { + stats.inc (nano::stat::type::http_callback, nano::stat::detail::initiate, nano::stat::dir::out); + } + else + { + logger.error (nano::log::type::rpc_callbacks, "Callback to {}:{} failed [status: {}]", address, port, nano::util::to_str (resp->result ())); + stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); + } + } + else + { + logger.error (nano::log::type::rpc_callbacks, "Unable to complete callback: {}:{} ({})", address, port, ec.message ()); + stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); + }; + }); + } + else + { + logger.error (nano::log::type::rpc_callbacks, "Unable to send callback: {}:{} ({})", address, port, ec.message ()); + stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); + } + }); + } + else + { + logger.error (nano::log::type::rpc_callbacks, "Unable to connect to callback address({}): {}:{} ({})", address, i_a->endpoint ().address ().to_string (), port, ec.message ()); + stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); + ++i_a; + + do_rpc_callback (i_a, address, port, target, body, resolver); + } + }); + } +} \ No newline at end of file diff --git a/nano/node/rpc_callbacks.hpp b/nano/node/rpc_callbacks.hpp new file mode 100644 index 000000000..1461a3d36 --- /dev/null +++ b/nano/node/rpc_callbacks.hpp @@ -0,0 +1,24 @@ +#pragma once + +#include + +namespace nano +{ +class rpc_callbacks +{ +public: + explicit rpc_callbacks (nano::node &); + +private: // Dependencies + nano::node_config const & config; + nano::node & node; + nano::node_observers & observers; + nano::ledger & ledger; + nano::logger & logger; + nano::stats & stats; + +private: + void setup_callbacks (); + void do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const &, uint16_t, std::shared_ptr const &, std::shared_ptr const &, std::shared_ptr const &); +}; +} \ No newline at end of file