diff --git a/ci/build-travis.sh b/ci/build-travis.sh index 5a21b49d..08dec124 100755 --- a/ci/build-travis.sh +++ b/ci/build-travis.sh @@ -8,6 +8,12 @@ set -o nounset set -o xtrace OS=`uname` +# This is to prevent out of scope access in async_write from asio which is not picked up by static analysers +if [[ $(grep -rl --exclude="*asio.hpp" "asio::async_write" ./nano) ]]; then + echo "using boost::asio::async_write directly is not permitted (except in nano/lib/asio.hpp). Use nano::async_write instead" + exit 1 +fi + mkdir build pushd build @@ -44,7 +50,6 @@ cmake \ ${SANITIZERS} \ .. - if [[ "$OS" == 'Linux' ]]; then cmake --build ${PWD} -- -j2 else diff --git a/nano/core_test/ipc.cpp b/nano/core_test/ipc.cpp index 1430984d..20404959 100644 --- a/nano/core_test/ipc.cpp +++ b/nano/core_test/ipc.cpp @@ -30,7 +30,7 @@ TEST (ipc, asynchronous) client.async_connect ("::1", 24077, [&client, &req, &res, &call_completed](nano::error err) { client.async_write (req, [&client, &req, &res, &call_completed](nano::error err_a, size_t size_a) { ASSERT_NO_ERROR (static_cast (err_a)); - ASSERT_EQ (size_a, req->size ()); + ASSERT_EQ (size_a, req.size ()); // Read length client.async_read (res, sizeof (uint32_t), [&client, &res, &call_completed](nano::error err_read_a, size_t size_read_a) { ASSERT_NO_ERROR (static_cast (err_read_a)); diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index c492416c..d41cd17b 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -1445,13 +1445,13 @@ TEST (bootstrap, tcp_node_id_handshake) auto bootstrap_endpoint (system.nodes[0]->bootstrap.endpoint ()); auto cookie (system.nodes[0]->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (bootstrap_endpoint))); nano::node_id_handshake node_id_handshake (cookie, boost::none); - auto input (node_id_handshake.to_bytes ()); + auto input (node_id_handshake.to_shared_const_buffer ()); std::atomic write_done (false); socket->async_connect (bootstrap_endpoint, [&input, socket, &write_done](boost::system::error_code const & ec) { ASSERT_FALSE (ec); socket->async_write (input, [&input, &write_done](boost::system::error_code const & ec, size_t size_a) { ASSERT_FALSE (ec); - ASSERT_EQ (input->size (), size_a); + ASSERT_EQ (input.size (), size_a); write_done = true; }); }); @@ -2283,12 +2283,12 @@ TEST (bootstrap, tcp_listener_timeout_node_id_handshake) auto socket (std::make_shared (node0)); auto cookie (node0->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (node0->bootstrap.endpoint ()))); nano::node_id_handshake node_id_handshake (cookie, boost::none); - auto input (node_id_handshake.to_bytes ()); + auto input (node_id_handshake.to_shared_const_buffer ()); socket->async_connect (node0->bootstrap.endpoint (), [&input, socket](boost::system::error_code const & ec) { ASSERT_FALSE (ec); socket->async_write (input, [&input](boost::system::error_code const & ec, size_t size_a) { ASSERT_FALSE (ec); - ASSERT_EQ (input->size (), size_a); + ASSERT_EQ (input.size (), size_a); }); }); system.deadline_set (std::chrono::seconds (5)); diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 4d1594c8..0ac9a6a4 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -449,10 +449,10 @@ TEST (node, connect_after_junk) { nano::system system (24000, 1); auto node1 (std::make_shared (system.io_ctx, 24001, nano::unique_path (), system.alarm, system.logging, system.work)); - auto junk_buffer (std::make_shared> ()); - junk_buffer->push_back (0); + std::vector junk_buffer; + junk_buffer.push_back (0); auto channel1 (std::make_shared (node1->network.udp_channels, system.nodes[0]->network.endpoint (), node1->network_params.protocol.protocol_version)); - channel1->send_buffer (junk_buffer, nano::stat::detail::bulk_pull, [](boost::system::error_code const &, size_t) {}); + channel1->send_buffer (nano::shared_const_buffer (std::move (junk_buffer)), nano::stat::detail::bulk_pull, [](boost::system::error_code const &, size_t) {}); system.deadline_set (10s); while (system.nodes[0]->stats.count (nano::stat::type::error) == 0) { @@ -1501,13 +1501,13 @@ TEST (node, fork_no_vote_quorum) ASSERT_FALSE (system.wallet (1)->store.fetch (transaction, key1, key3)); auto vote (std::make_shared (key1, key3, 0, send2)); nano::confirm_ack confirm (vote); - std::shared_ptr> bytes (new std::vector); + std::vector buffer; { - nano::vectorstream stream (*bytes); + nano::vectorstream stream (buffer); confirm.serialize (stream); } nano::transport::channel_udp channel (node2.network.udp_channels, node3.network.endpoint (), node1.network_params.protocol.protocol_version); - channel.send_buffer (bytes, nano::stat::detail::confirm_ack); + channel.send_buffer (nano::shared_const_buffer (std::move (buffer)), nano::stat::detail::confirm_ack); while (node3.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::in) < 3) { ASSERT_NO_ERROR (system.poll ()); diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index b9ec79c3..f7c56808 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -116,9 +116,9 @@ TEST (socket, concurrent_writes) client_threads.emplace_back ([&client, &message_count]() { for (int i = 0; i < message_count; i++) { - auto buff (std::make_shared> ()); - buff->push_back ('A' + i); - client->async_write (buff); + std::vector buff; + buff.push_back ('A' + i); + client->async_write (nano::shared_const_buffer (std::move (buff))); } }); #ifndef _WIN32 diff --git a/nano/lib/CMakeLists.txt b/nano/lib/CMakeLists.txt index c6832ac7..5e008f8b 100644 --- a/nano/lib/CMakeLists.txt +++ b/nano/lib/CMakeLists.txt @@ -14,6 +14,8 @@ add_library (nano_lib ${platform_sources} alarm.hpp alarm.cpp + asio.hpp + asio.cpp blockbuilders.hpp blockbuilders.cpp blocks.hpp diff --git a/nano/lib/asio.cpp b/nano/lib/asio.cpp new file mode 100644 index 00000000..fac8c100 --- /dev/null +++ b/nano/lib/asio.cpp @@ -0,0 +1,45 @@ +#include + +nano::shared_const_buffer::shared_const_buffer (const std::vector & data) : +m_data (std::make_shared> (data)), +m_buffer (boost::asio::buffer (*m_data)) +{ +} + +nano::shared_const_buffer::shared_const_buffer (std::vector && data) : +m_data (std::make_shared> (std::move (data))), +m_buffer (boost::asio::buffer (*m_data)) +{ +} + +nano::shared_const_buffer::shared_const_buffer (uint8_t data) : +shared_const_buffer (std::vector{ data }) +{ +} + +nano::shared_const_buffer::shared_const_buffer (std::string const & data) : +m_data (std::make_shared> (data.begin (), data.end ())), +m_buffer (boost::asio::buffer (*m_data)) +{ +} + +nano::shared_const_buffer::shared_const_buffer (std::shared_ptr> const & data) : +m_data (data), +m_buffer (boost::asio::buffer (*m_data)) +{ +} + +const boost::asio::const_buffer * nano::shared_const_buffer::begin () const +{ + return &m_buffer; +} + +const boost::asio::const_buffer * nano::shared_const_buffer::end () const +{ + return &m_buffer + 1; +} + +size_t nano::shared_const_buffer::size () const +{ + return m_buffer.size (); +} diff --git a/nano/lib/asio.hpp b/nano/lib/asio.hpp new file mode 100644 index 00000000..f25c2d3d --- /dev/null +++ b/nano/lib/asio.hpp @@ -0,0 +1,37 @@ +#pragma once + +#include + +namespace nano +{ +class shared_const_buffer +{ +public: + using value_type = boost::asio::const_buffer; + using const_iterator = const boost::asio::const_buffer *; + + explicit shared_const_buffer (std::vector const & data); + explicit shared_const_buffer (uint8_t data); + explicit shared_const_buffer (std::string const & data); + explicit shared_const_buffer (std::vector && data); + explicit shared_const_buffer (std::shared_ptr> const & data); + + const boost::asio::const_buffer * begin () const; + const boost::asio::const_buffer * end () const; + + size_t size () const; + +private: + std::shared_ptr> m_data; + boost::asio::const_buffer m_buffer; +}; + +static_assert (boost::asio::is_const_buffer_sequence::value, "Not ConstBufferSequence compliant"); + +template +BOOST_ASIO_INITFN_RESULT_TYPE (WriteHandler, void(boost::system::error_code, std::size_t)) +async_write (AsyncWriteStream & s, nano::shared_const_buffer const & buffer, WriteHandler && handler) +{ + return boost::asio::async_write (s, buffer, std::move (handler)); +} +} \ No newline at end of file diff --git a/nano/lib/ipc_client.cpp b/nano/lib/ipc_client.cpp index 9922dd5a..df07d33f 100644 --- a/nano/lib/ipc_client.cpp +++ b/nano/lib/ipc_client.cpp @@ -1,3 +1,4 @@ +#include #include #include @@ -11,7 +12,7 @@ class channel { public: virtual void async_read (std::shared_ptr> buffer_a, size_t size_a, std::function callback_a) = 0; - virtual void async_write (std::shared_ptr> buffer_a, std::function callback_a) = 0; + virtual void async_write (nano::shared_const_buffer const & buffer_a, std::function callback_a) = 0; }; /* Boost v1.70 introduced breaking changes; the conditional compilation allows 1.6x to be supported as well. */ @@ -68,10 +69,10 @@ public: })); } - void async_write (std::shared_ptr> buffer_a, std::function callback_a) override + void async_write (nano::shared_const_buffer const & buffer_a, std::function callback_a) override { this->timer_start (io_timeout); - boost::asio::async_write (socket, boost::asio::buffer (buffer_a->data (), buffer_a->size ()), boost::asio::bind_executor (this->strand, [this, callback_a, buffer_a](boost::system::error_code const & ec, size_t size_a) { + nano::async_write (socket, buffer_a, boost::asio::bind_executor (this->strand, [this, callback_a](boost::system::error_code const & ec, size_t size_a) { this->timer_cancel (); callback_a (ec, size_a); })); @@ -178,7 +179,7 @@ nano::error nano::ipc::ipc_client::connect (std::string const & host, uint16_t p return result_l.get_future ().get (); } -void nano::ipc::ipc_client::async_write (std::shared_ptr> buffer_a, std::function callback_a) +void nano::ipc::ipc_client::async_write (nano::shared_const_buffer const & buffer_a, std::function callback_a) { auto client (boost::polymorphic_downcast (impl.get ())); client->get_channel ().async_write (buffer_a, [callback_a](const boost::system::error_code & ec_a, size_t bytes_written_a) { @@ -194,23 +195,23 @@ void nano::ipc::ipc_client::async_read (std::shared_ptr> bu }); } -std::shared_ptr> nano::ipc::prepare_request (nano::ipc::payload_encoding encoding_a, std::string const & payload_a) +nano::shared_const_buffer nano::ipc::prepare_request (nano::ipc::payload_encoding encoding_a, std::string const & payload_a) { - auto buffer_l (std::make_shared> ()); + std::vector buffer_l; if (encoding_a == nano::ipc::payload_encoding::json_legacy) { - buffer_l->push_back ('N'); - buffer_l->push_back (static_cast (encoding_a)); - buffer_l->push_back (0); - buffer_l->push_back (0); + buffer_l.push_back ('N'); + buffer_l.push_back (static_cast (encoding_a)); + buffer_l.push_back (0); + buffer_l.push_back (0); auto payload_length = static_cast (payload_a.size ()); uint32_t be = boost::endian::native_to_big (payload_length); char * chars = reinterpret_cast (&be); - buffer_l->insert (buffer_l->end (), chars, chars + sizeof (uint32_t)); - buffer_l->insert (buffer_l->end (), payload_a.begin (), payload_a.end ()); + buffer_l.insert (buffer_l.end (), chars, chars + sizeof (uint32_t)); + buffer_l.insert (buffer_l.end (), payload_a.begin (), payload_a.end ()); } - return buffer_l; + return nano::shared_const_buffer{ std::move (buffer_l) }; } std::string nano::ipc::request (nano::ipc::ipc_client & ipc_client, std::string const & rpc_action_a) diff --git a/nano/lib/ipc_client.hpp b/nano/lib/ipc_client.hpp index 0f61dc33..af406c4c 100644 --- a/nano/lib/ipc_client.hpp +++ b/nano/lib/ipc_client.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include @@ -12,6 +13,7 @@ namespace nano { +class shared_const_buffer; namespace ipc { class ipc_client_impl @@ -38,7 +40,7 @@ namespace ipc void async_connect (std::string const & host, uint16_t port, std::function callback); /** Write buffer asynchronously */ - void async_write (std::shared_ptr> buffer_a, std::function callback_a); + void async_write (nano::shared_const_buffer const & buffer_a, std::function callback_a); /** Read \p size_a bytes asynchronously */ void async_read (std::shared_ptr> buffer_a, size_t size_a, std::function callback_a); @@ -57,6 +59,6 @@ namespace ipc * Returns a buffer with an IPC preamble for the given \p encoding_a followed by the payload. Depending on encoding, * the buffer may contain a payload length or end sentinel. */ - std::shared_ptr> prepare_request (nano::ipc::payload_encoding encoding_a, std::string const & payload_a); + nano::shared_const_buffer prepare_request (nano::ipc::payload_encoding encoding_a, std::string const & payload_a); } } diff --git a/nano/lib/utility.hpp b/nano/lib/utility.hpp index f1cc299b..d5a7c1ad 100644 --- a/nano/lib/utility.hpp +++ b/nano/lib/utility.hpp @@ -221,6 +221,7 @@ inline std::unique_ptr collect_seq_con_info (observer_se void remove_all_files_in_dir (boost::filesystem::path const & dir); void move_all_files_to_dir (boost::filesystem::path const & from, boost::filesystem::path const & to); } +// Have our own async_write which we must use? void release_assert_internal (bool check, const char * check_expr, const char * file, unsigned int line); #define release_assert(check) release_assert_internal (check, #check, __FILE__, __LINE__) diff --git a/nano/node/bootstrap.cpp b/nano/node/bootstrap.cpp index d884a0db..7b6f7bcd 100644 --- a/nano/node/bootstrap.cpp +++ b/nano/node/bootstrap.cpp @@ -608,8 +608,7 @@ void nano::bulk_push_client::push (nano::transaction const & transaction_a) void nano::bulk_push_client::send_finished () { - auto buffer (std::make_shared> ()); - buffer->push_back (static_cast (nano::block_type::not_a_block)); + nano::shared_const_buffer buffer (static_cast (nano::block_type::not_a_block)); auto this_l (shared_from_this ()); connection->channel->send_buffer (buffer, nano::stat::detail::all, [this_l](boost::system::error_code const & ec, size_t size_a) { try @@ -624,13 +623,13 @@ void nano::bulk_push_client::send_finished () void nano::bulk_push_client::push_block (nano::block const & block_a) { - auto buffer (std::make_shared> ()); + std::vector buffer; { - nano::vectorstream stream (*buffer); + nano::vectorstream stream (buffer); nano::serialize_block (stream, block_a); } auto this_l (shared_from_this ()); - connection->channel->send_buffer (buffer, nano::stat::detail::all, [this_l](boost::system::error_code const & ec, size_t size_a) { + connection->channel->send_buffer (nano::shared_const_buffer (std::move (buffer)), nano::stat::detail::all, [this_l](boost::system::error_code const & ec, size_t size_a) { if (!ec) { auto transaction (this_l->connection->node->store.tx_begin_read ()); @@ -2423,9 +2422,9 @@ public: assert (!nano::validate_message (response->first, *message_a.query, response->second)); auto cookie (connection->node->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (connection->remote_endpoint))); nano::node_id_handshake response_message (cookie, response); - auto bytes = response_message.to_bytes (); + auto shared_const_buffer = response_message.to_shared_const_buffer (); // clang-format off - connection->socket->async_write (bytes, [ bytes, connection = connection ](boost::system::error_code const & ec, size_t size_a) { + connection->socket->async_write (shared_const_buffer, [connection = connection ](boost::system::error_code const & ec, size_t size_a) { if (ec) { if (connection->node->config.logging.network_node_id_handshake_logging ()) @@ -2577,9 +2576,9 @@ void nano::bulk_pull_server::send_next () auto block (get_next ()); if (block != nullptr) { + std::vector send_buffer; { - send_buffer->clear (); - nano::vectorstream stream (*send_buffer); + nano::vectorstream stream (send_buffer); nano::serialize_block (stream, *block); } auto this_l (shared_from_this ()); @@ -2587,7 +2586,7 @@ void nano::bulk_pull_server::send_next () { connection->node->logger.try_log (boost::str (boost::format ("Sending block: %1%") % block->hash ().to_string ())); } - connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) { + connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l](boost::system::error_code const & ec, size_t size_a) { this_l->sent_action (ec, size_a); }); } @@ -2686,8 +2685,7 @@ void nano::bulk_pull_server::sent_action (boost::system::error_code const & ec, void nano::bulk_pull_server::send_finished () { - send_buffer->clear (); - send_buffer->push_back (static_cast (nano::block_type::not_a_block)); + nano::shared_const_buffer send_buffer (static_cast (nano::block_type::not_a_block)); auto this_l (shared_from_this ()); if (connection->node->config.logging.bulk_pull_logging ()) { @@ -2716,8 +2714,7 @@ void nano::bulk_pull_server::no_block_sent (boost::system::error_code const & ec nano::bulk_pull_server::bulk_pull_server (std::shared_ptr const & connection_a, std::unique_ptr request_a) : connection (connection_a), -request (std::move (request_a)), -send_buffer (std::make_shared> ()) +request (std::move (request_a)) { set_current_end (); } @@ -2787,17 +2784,16 @@ void nano::bulk_pull_account_server::send_frontier () nano::uint128_union account_frontier_balance (account_frontier_balance_int); // Write the frontier block hash and balance into a buffer - send_buffer->clear (); + std::vector send_buffer; { - nano::vectorstream output_stream (*send_buffer); - + nano::vectorstream output_stream (send_buffer); write (output_stream, account_frontier_hash.bytes); write (output_stream, account_frontier_balance.bytes); } // Send the buffer to the requestor auto this_l (shared_from_this ()); - connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) { + connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l](boost::system::error_code const & ec, size_t size_a) { this_l->sent_action (ec, size_a); }); } @@ -2818,11 +2814,11 @@ void nano::bulk_pull_account_server::send_next_block () /* * If we have a new item, emit it to the socket */ - send_buffer->clear (); + std::vector send_buffer; if (pending_address_only) { - nano::vectorstream output_stream (*send_buffer); + nano::vectorstream output_stream (send_buffer); if (connection->node->config.logging.bulk_pull_logging ()) { @@ -2833,7 +2829,7 @@ void nano::bulk_pull_account_server::send_next_block () } else { - nano::vectorstream output_stream (*send_buffer); + nano::vectorstream output_stream (send_buffer); if (connection->node->config.logging.bulk_pull_logging ()) { @@ -2853,7 +2849,7 @@ void nano::bulk_pull_account_server::send_next_block () } auto this_l (shared_from_this ()); - connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) { + connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l](boost::system::error_code const & ec, size_t size_a) { this_l->sent_action (ec, size_a); }); } @@ -2975,10 +2971,9 @@ void nano::bulk_pull_account_server::send_finished () * "pending_include_address" flag is not set) or 640-bits of zeros * (if that flag is set). */ - send_buffer->clear (); - + std::vector send_buffer; { - nano::vectorstream output_stream (*send_buffer); + nano::vectorstream output_stream (send_buffer); nano::uint256_union account_zero (0); nano::uint128_union balance_zero (0); @@ -3001,7 +2996,7 @@ void nano::bulk_pull_account_server::send_finished () connection->node->logger.try_log ("Bulk sending for an account finished"); } - connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) { + connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l](boost::system::error_code const & ec, size_t size_a) { this_l->complete (ec, size_a); }); } @@ -3040,7 +3035,6 @@ void nano::bulk_pull_account_server::complete (boost::system::error_code const & nano::bulk_pull_account_server::bulk_pull_account_server (std::shared_ptr const & connection_a, std::unique_ptr request_a) : connection (connection_a), request (std::move (request_a)), -send_buffer (std::make_shared> ()), current_key (0, 0) { /* @@ -3190,7 +3184,6 @@ connection (connection_a), current (request_a->start.number () - 1), frontier (0), request (std::move (request_a)), -send_buffer (std::make_shared> ()), count (0) { next (); @@ -3200,9 +3193,9 @@ void nano::frontier_req_server::send_next () { if (!current.is_zero () && count < request->count) { + std::vector send_buffer; { - send_buffer->clear (); - nano::vectorstream stream (*send_buffer); + nano::vectorstream stream (send_buffer); write (stream, current.bytes); write (stream, frontier.bytes); } @@ -3212,7 +3205,7 @@ void nano::frontier_req_server::send_next () connection->node->logger.try_log (boost::str (boost::format ("Sending frontier for %1% %2%") % current.to_account () % frontier.to_string ())); } next (); - connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) { + connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l](boost::system::error_code const & ec, size_t size_a) { this_l->sent_action (ec, size_a); }); } @@ -3224,9 +3217,9 @@ void nano::frontier_req_server::send_next () void nano::frontier_req_server::send_finished () { + std::vector send_buffer; { - send_buffer->clear (); - nano::vectorstream stream (*send_buffer); + nano::vectorstream stream (send_buffer); nano::uint256_union zero (0); write (stream, zero.bytes); write (stream, zero.bytes); @@ -3236,7 +3229,7 @@ void nano::frontier_req_server::send_finished () { connection->node->logger.try_log ("Frontier sending finished"); } - connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) { + connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l](boost::system::error_code const & ec, size_t size_a) { this_l->no_block_sent (ec, size_a); }); } diff --git a/nano/node/bootstrap.hpp b/nano/node/bootstrap.hpp index e83f480a..ec33fa74 100644 --- a/nano/node/bootstrap.hpp +++ b/nano/node/bootstrap.hpp @@ -344,7 +344,6 @@ public: void no_block_sent (boost::system::error_code const &, size_t); std::shared_ptr connection; std::unique_ptr request; - std::shared_ptr> send_buffer; nano::block_hash current; bool include_start; nano::bulk_pull::count_t max_count; @@ -364,7 +363,6 @@ public: void complete (boost::system::error_code const &, size_t); std::shared_ptr connection; std::unique_ptr request; - std::shared_ptr> send_buffer; std::unordered_set deduplication; nano::pending_key current_key; bool pending_address_only; @@ -396,7 +394,6 @@ public: nano::account current; nano::block_hash frontier; std::unique_ptr request; - std::shared_ptr> send_buffer; size_t count; std::deque> accounts; }; diff --git a/nano/node/common.hpp b/nano/node/common.hpp index 7cd53ba6..a179d169 100644 --- a/nano/node/common.hpp +++ b/nano/node/common.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -233,13 +234,17 @@ public: virtual ~message () = default; virtual void serialize (nano::stream &) const = 0; virtual void visit (nano::message_visitor &) const = 0; - virtual std::shared_ptr> to_bytes () const + std::shared_ptr> to_bytes () const { auto bytes = std::make_shared> (); nano::vectorstream stream (*bytes); serialize (stream); return bytes; } + nano::shared_const_buffer to_shared_const_buffer () const + { + return shared_const_buffer (to_bytes ()); + } nano::message_header header; }; class work_pool; diff --git a/nano/node/ipc.cpp b/nano/node/ipc.cpp index 78aeb071..b98b3dd7 100644 --- a/nano/node/ipc.cpp +++ b/nano/node/ipc.cpp @@ -95,20 +95,17 @@ public: // json and write the response to the ipc socket with a length prefix. auto this_l (this->shared_from_this ()); auto response_handler_l ([this_l, request_id_l](std::string const & body) { - this_l->response_body = body; - this_l->size_response = boost::endian::native_to_big (static_cast (this_l->response_body.size ())); - std::vector bufs = { - boost::asio::buffer (&this_l->size_response, sizeof (this_l->size_response)), - boost::asio::buffer (this_l->response_body) - }; - + auto big = boost::endian::native_to_big (static_cast (body.size ())); + std::vector buffer; + buffer.insert (buffer.end (), reinterpret_cast (&big), reinterpret_cast (&big) + sizeof (std::uint32_t)); + buffer.insert (buffer.end (), body.begin (), body.end ()); if (this_l->node.config.logging.log_ipc ()) { this_l->node.logger.always_log (boost::str (boost::format ("IPC/RPC request %1% completed in: %2% %3%") % request_id_l % this_l->session_timer.stop ().count () % this_l->session_timer.unit ())); } this_l->timer_start (std::chrono::seconds (this_l->config_transport.io_timeout)); - boost::asio::async_write (this_l->socket, bufs, [this_l](boost::system::error_code const & error_a, size_t size_a) { + nano::async_write (this_l->socket, nano::shared_const_buffer (buffer), [this_l](boost::system::error_code const & error_a, size_t size_a) { this_l->timer_cancel (); if (!error_a) { @@ -201,10 +198,6 @@ private: /** Buffer sizes are read into this */ uint32_t buffer_size{ 0 }; - /** RPC response */ - std::string response_body; - uint32_t size_response{ 0 }; - /** Buffer used to store data received from the client */ std::vector buffer; diff --git a/nano/node/socket.cpp b/nano/node/socket.cpp index 0bd34735..10dc5d47 100644 --- a/nano/node/socket.cpp +++ b/nano/node/socket.cpp @@ -59,7 +59,7 @@ void nano::socket::async_read (std::shared_ptr> buffer_a, s } } -void nano::socket::async_write (std::shared_ptr> buffer_a, std::function callback_a) +void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function callback_a) { auto this_l (shared_from_this ()); if (!closed) @@ -86,9 +86,9 @@ void nano::socket::async_write (std::shared_ptr> buffer_a, else { start_timer (); - boost::asio::async_write (tcp_socket, boost::asio::buffer (buffer_a->data (), buffer_a->size ()), + nano::async_write (tcp_socket, buffer_a, boost::asio::bind_executor (strand, - [this_l, buffer_a, callback_a](boost::system::error_code const & ec, size_t size_a) { + [this_l, callback_a](boost::system::error_code const & ec, size_t size_a) { if (auto node = this_l->node.lock ()) { node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a); @@ -110,7 +110,7 @@ void nano::socket::write_queued_messages () std::weak_ptr this_w (shared_from_this ()); auto msg (send_queue.front ()); start_timer (); - boost::asio::async_write (tcp_socket, boost::asio::buffer (msg.buffer->data (), msg.buffer->size ()), + nano::async_write (tcp_socket, msg.buffer, boost::asio::bind_executor (strand, [msg, this_w](boost::system::error_code ec, std::size_t size_a) { if (auto this_l = this_w.lock ()) diff --git a/nano/node/socket.hpp b/nano/node/socket.hpp index bc4b44d0..9dd291ff 100644 --- a/nano/node/socket.hpp +++ b/nano/node/socket.hpp @@ -1,6 +1,8 @@ #pragma once #include +#include +#include #include @@ -43,7 +45,7 @@ public: virtual ~socket (); void async_connect (boost::asio::ip::tcp::endpoint const &, std::function); void async_read (std::shared_ptr>, size_t, std::function); - void async_write (std::shared_ptr>, std::function = nullptr); + void async_write (nano::shared_const_buffer const &, std::function = nullptr); void close (); boost::asio::ip::tcp::endpoint remote_endpoint () const; @@ -60,7 +62,7 @@ protected: class queue_item { public: - std::shared_ptr> buffer; + nano::shared_const_buffer buffer; std::function callback; }; diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 58f6be37..7c1800f3 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -41,20 +41,20 @@ bool nano::transport::channel_tcp::operator== (nano::transport::channel const & return result; } -void nano::transport::channel_tcp::send_buffer (std::shared_ptr> buffer_a, nano::stat::detail detail_a, std::function const & callback_a) +void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const & buffer_a, nano::stat::detail detail_a, std::function const & callback_a) { - socket->async_write (buffer_a, tcp_callback (buffer_a, detail_a, socket->remote_endpoint (), callback_a)); + socket->async_write (buffer_a, tcp_callback (detail_a, socket->remote_endpoint (), callback_a)); } -std::function nano::transport::channel_tcp::callback (std::shared_ptr> buffer_a, nano::stat::detail detail_a, std::function const & callback_a) const +std::function nano::transport::channel_tcp::callback (nano::stat::detail detail_a, std::function const & callback_a) const { return callback_a; } -std::function nano::transport::channel_tcp::tcp_callback (std::shared_ptr> buffer_a, nano::stat::detail detail_a, nano::tcp_endpoint const & endpoint_a, std::function const & callback_a) const +std::function nano::transport::channel_tcp::tcp_callback (nano::stat::detail detail_a, nano::tcp_endpoint const & endpoint_a, std::function const & callback_a) const { // clang-format off - return [ buffer_a, endpoint_a, node = std::weak_ptr (node.shared ()), callback_a ](boost::system::error_code const & ec, size_t size_a) + return [endpoint_a, node = std::weak_ptr (node.shared ()), callback_a ](boost::system::error_code const & ec, size_t size_a) { if (auto node_l = node.lock ()) { @@ -489,7 +489,7 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a // TCP node ID handshake auto cookie (node_l->network.syn_cookies.assign (endpoint_a)); nano::node_id_handshake message (cookie, boost::none); - auto bytes = message.to_bytes (); + auto bytes = message.to_shared_const_buffer (); if (node_l->config.logging.network_node_id_handshake_logging ()) { node_l->logger.try_log (boost::str (boost::format ("Node ID handshake request sent with node ID %1% to %2%: query %3%") % node_l->node_id.pub.to_node_id () % endpoint_a % (*cookie).to_string ())); @@ -558,7 +558,7 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptrset_last_packet_received (std::chrono::steady_clock::now ()); boost::optional> response (std::make_pair (node_l->node_id.pub, nano::sign_message (node_l->node_id.prv, node_l->node_id.pub, *message.query))); nano::node_id_handshake response_message (boost::none, response); - auto bytes = response_message.to_bytes (); + auto bytes = response_message.to_shared_const_buffer (); if (node_l->config.logging.network_node_id_handshake_logging ()) { node_l->logger.try_log (boost::str (boost::format ("Node ID handshake response sent with node ID %1% to %2%: query %3%") % node_l->node_id.pub.to_node_id () % endpoint_a % (*message.query).to_string ())); diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index d4444428..22a301cc 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -27,9 +27,9 @@ namespace transport ~channel_tcp (); size_t hash_code () const override; bool operator== (nano::transport::channel const &) const override; - void send_buffer (std::shared_ptr>, nano::stat::detail, std::function const & = nullptr) override; - std::function callback (std::shared_ptr>, nano::stat::detail, std::function const & = nullptr) const override; - std::function tcp_callback (std::shared_ptr>, nano::stat::detail, nano::tcp_endpoint const &, std::function const & = nullptr) const; + void send_buffer (nano::shared_const_buffer const &, nano::stat::detail, std::function const & = nullptr) override; + std::function callback (nano::stat::detail, std::function const & = nullptr) const override; + std::function tcp_callback (nano::stat::detail, nano::tcp_endpoint const &, std::function const & = nullptr) const; std::string to_string () const override; bool operator== (nano::transport::channel_tcp const & other_a) const { diff --git a/nano/node/transport/transport.cpp b/nano/node/transport/transport.cpp index 7e7282f9..fcf513f0 100644 --- a/nano/node/transport/transport.cpp +++ b/nano/node/transport/transport.cpp @@ -80,9 +80,9 @@ void nano::transport::channel::send (nano::message const & message_a, std::funct { callback_visitor visitor; message_a.visit (visitor); - auto buffer (message_a.to_bytes ()); + auto buffer (message_a.to_shared_const_buffer ()); auto detail (visitor.result); - if (!is_droppable_a || !limiter.should_drop (buffer->size ())) + if (!is_droppable_a || !limiter.should_drop (buffer.size ())) { send_buffer (buffer, detail, callback_a); node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out); @@ -93,7 +93,7 @@ void nano::transport::channel::send (nano::message const & message_a, std::funct if (node.config.logging.network_packet_logging ()) { auto key = static_cast (detail) << 8; - node.logger.always_log (boost::str (boost::format ("%1% of size %2% dropped") % node.stats.detail_to_string (key) % buffer->size ())); + node.logger.always_log (boost::str (boost::format ("%1% of size %2% dropped") % node.stats.detail_to_string (key) % buffer.size ())); } } } diff --git a/nano/node/transport/transport.hpp b/nano/node/transport/transport.hpp index 13f77d52..27d941b2 100644 --- a/nano/node/transport/transport.hpp +++ b/nano/node/transport/transport.hpp @@ -54,8 +54,8 @@ namespace transport virtual size_t hash_code () const = 0; virtual bool operator== (nano::transport::channel const &) const = 0; void send (nano::message const &, std::function const & = nullptr, bool const = true); - virtual void send_buffer (std::shared_ptr>, nano::stat::detail, std::function const & = nullptr) = 0; - virtual std::function callback (std::shared_ptr>, nano::stat::detail, std::function const & = nullptr) const = 0; + virtual void send_buffer (nano::shared_const_buffer const &, nano::stat::detail, std::function const & = nullptr) = 0; + virtual std::function callback (nano::stat::detail, std::function const & = nullptr) const = 0; virtual std::string to_string () const = 0; virtual nano::endpoint get_endpoint () const = 0; virtual nano::tcp_endpoint get_tcp_endpoint () const = 0; diff --git a/nano/node/transport/udp.cpp b/nano/node/transport/udp.cpp index 949ee2ea..84dc8c7c 100644 --- a/nano/node/transport/udp.cpp +++ b/nano/node/transport/udp.cpp @@ -29,16 +29,16 @@ bool nano::transport::channel_udp::operator== (nano::transport::channel const & return result; } -void nano::transport::channel_udp::send_buffer (std::shared_ptr> buffer_a, nano::stat::detail detail_a, std::function const & callback_a) +void nano::transport::channel_udp::send_buffer (nano::shared_const_buffer const & buffer_a, nano::stat::detail detail_a, std::function const & callback_a) { set_last_packet_sent (std::chrono::steady_clock::now ()); - channels.send (boost::asio::const_buffer (buffer_a->data (), buffer_a->size ()), endpoint, callback (buffer_a, detail_a, callback_a)); + channels.send (buffer_a, endpoint, callback (detail_a, callback_a)); } -std::function nano::transport::channel_udp::callback (std::shared_ptr> buffer_a, nano::stat::detail detail_a, std::function const & callback_a) const +std::function nano::transport::channel_udp::callback (nano::stat::detail detail_a, std::function const & callback_a) const { // clang-format off - return [ buffer_a, node = std::weak_ptr (channels.node.shared ()), callback_a ](boost::system::error_code const & ec, size_t size_a) + return [node = std::weak_ptr (channels.node.shared ()), callback_a ](boost::system::error_code const & ec, size_t size_a) { if (auto node_l = node.lock ()) { @@ -80,7 +80,7 @@ socket (node_a.io_ctx, nano::endpoint (boost::asio::ip::address_v6::any (), port local_endpoint = nano::endpoint (boost::asio::ip::address_v6::loopback (), port); } -void nano::transport::udp_channels::send (boost::asio::const_buffer buffer_a, nano::endpoint endpoint_a, std::function const & callback_a) +void nano::transport::udp_channels::send (nano::shared_const_buffer const & buffer_a, nano::endpoint endpoint_a, std::function const & callback_a) { boost::asio::post (strand, [this, buffer_a, endpoint_a, callback_a]() { diff --git a/nano/node/transport/udp.hpp b/nano/node/transport/udp.hpp index 8f28d5f9..42ee0e66 100644 --- a/nano/node/transport/udp.hpp +++ b/nano/node/transport/udp.hpp @@ -27,8 +27,8 @@ namespace transport channel_udp (nano::transport::udp_channels &, nano::endpoint const &, uint8_t protocol_version); size_t hash_code () const override; bool operator== (nano::transport::channel const &) const override; - void send_buffer (std::shared_ptr>, nano::stat::detail, std::function const & = nullptr) override; - std::function callback (std::shared_ptr>, nano::stat::detail, std::function const & = nullptr) const override; + void send_buffer (nano::shared_const_buffer const &, nano::stat::detail, std::function const & = nullptr) override; + std::function callback (nano::stat::detail, std::function const & = nullptr) const override; std::string to_string () const override; bool operator== (nano::transport::channel_udp const & other_a) const { @@ -77,7 +77,7 @@ namespace transport void receive (); void start (); void stop (); - void send (boost::asio::const_buffer buffer_a, nano::endpoint endpoint_a, std::function const & callback_a); + void send (nano::shared_const_buffer const & buffer_a, nano::endpoint endpoint_a, std::function const & callback_a); nano::endpoint get_local_endpoint () const; void receive_action (nano::message_buffer *); void process_packets (); diff --git a/nano/node/websocket.cpp b/nano/node/websocket.cpp index ec93e451..c7cf990c 100644 --- a/nano/node/websocket.cpp +++ b/nano/node/websocket.cpp @@ -244,14 +244,13 @@ void nano::websocket::session::write (nano::websocket::message message_a) void nano::websocket::session::write_queued_messages () { - auto msg (send_queue.front ()); - auto msg_str (msg.to_string ()); + auto msg (send_queue.front ().to_string ()); auto this_l (shared_from_this ()); // clang-format off - ws.async_write (boost::asio::buffer (msg_str->data (), msg_str->size ()), + ws.async_write (nano::shared_const_buffer (msg), boost::asio::bind_executor (strand, - [msg_str, this_l](boost::system::error_code ec, std::size_t bytes_transferred) { + [this_l](boost::system::error_code ec, std::size_t bytes_transferred) { this_l->send_queue.pop_front (); if (!ec) { @@ -673,10 +672,10 @@ void nano::websocket::message_builder::set_common_fields (nano::websocket::messa message_a.contents.add ("time", std::to_string (milli_since_epoch)); } -std::shared_ptr nano::websocket::message::to_string () const +std::string nano::websocket::message::to_string () const { std::ostringstream ostream; boost::property_tree::write_json (ostream, contents); ostream.flush (); - return std::make_shared (ostream.str ()); + return ostream.str (); } diff --git a/nano/node/websocket.hpp b/nano/node/websocket.hpp index 663581fd..177dae63 100644 --- a/nano/node/websocket.hpp +++ b/nano/node/websocket.hpp @@ -70,7 +70,7 @@ namespace websocket { } - std::shared_ptr to_string () const; + std::string to_string () const; nano::websocket::topic topic; boost::property_tree::ptree contents; }; diff --git a/nano/rpc/rpc_connection.cpp b/nano/rpc/rpc_connection.cpp index b8596700..751033ab 100644 --- a/nano/rpc/rpc_connection.cpp +++ b/nano/rpc/rpc_connection.cpp @@ -48,7 +48,6 @@ void nano::rpc_connection::write_result (std::string body, unsigned version, boo else { assert (false && "RPC already responded and should only respond once"); - // Guards `res' from being clobbered while async_write is being serviced } } diff --git a/nano/rpc/rpc_request_processor.cpp b/nano/rpc/rpc_request_processor.cpp index 457c0ab1..a66e5997 100644 --- a/nano/rpc/rpc_request_processor.cpp +++ b/nano/rpc/rpc_request_processor.cpp @@ -1,3 +1,4 @@ +#include #include #include @@ -83,7 +84,7 @@ void nano::rpc_request_processor::make_available (nano::ipc_connection & connect } // Connection does not exist or has been closed, try to connect to it again and then resend IPC request -void nano::rpc_request_processor::try_reconnect_and_execute_request (std::shared_ptr connection, std::shared_ptr> req, std::shared_ptr> res, std::shared_ptr rpc_request) +void nano::rpc_request_processor::try_reconnect_and_execute_request (std::shared_ptr connection, nano::shared_const_buffer const & req, std::shared_ptr> res, std::shared_ptr rpc_request) { connection->client.async_connect (ipc_address, ipc_port, [this, connection, req, res, rpc_request](nano::error err) { if (!err) diff --git a/nano/rpc/rpc_request_processor.hpp b/nano/rpc/rpc_request_processor.hpp index 0b924daf..42225257 100644 --- a/nano/rpc/rpc_request_processor.hpp +++ b/nano/rpc/rpc_request_processor.hpp @@ -48,7 +48,7 @@ public: private: void run (); void read_payload (std::shared_ptr connection, std::shared_ptr> res, std::shared_ptr rpc_request); - void try_reconnect_and_execute_request (std::shared_ptr connection, std::shared_ptr> req, std::shared_ptr> res, std::shared_ptr rpc_request); + void try_reconnect_and_execute_request (std::shared_ptr connection, nano::shared_const_buffer const & req, std::shared_ptr> res, std::shared_ptr rpc_request); void make_available (nano::ipc_connection & connection); std::vector> connections;