Wrap boost::asio::async_write to ensure lifetime of buffers (#2240)

* Wrap boost::asio::async_write to ensure buffer lifetime

* Formatting and add static assert for checking buffer sequence is correct

* Readd aliases to shared_const_buffer
This commit is contained in:
Wesley Shillingford 2019-08-29 22:34:02 +01:00 committed by GitHub
commit 86d20d556d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 201 additions and 119 deletions

View file

@ -8,6 +8,12 @@ set -o nounset
set -o xtrace
OS=`uname`
# This is to prevent out of scope access in async_write from asio which is not picked up by static analysers
if [[ $(grep -rl --exclude="*asio.hpp" "asio::async_write" ./nano) ]]; then
echo "using boost::asio::async_write directly is not permitted (except in nano/lib/asio.hpp). Use nano::async_write instead"
exit 1
fi
mkdir build
pushd build
@ -44,7 +50,6 @@ cmake \
${SANITIZERS} \
..
if [[ "$OS" == 'Linux' ]]; then
cmake --build ${PWD} -- -j2
else

View file

@ -30,7 +30,7 @@ TEST (ipc, asynchronous)
client.async_connect ("::1", 24077, [&client, &req, &res, &call_completed](nano::error err) {
client.async_write (req, [&client, &req, &res, &call_completed](nano::error err_a, size_t size_a) {
ASSERT_NO_ERROR (static_cast<std::error_code> (err_a));
ASSERT_EQ (size_a, req->size ());
ASSERT_EQ (size_a, req.size ());
// Read length
client.async_read (res, sizeof (uint32_t), [&client, &res, &call_completed](nano::error err_read_a, size_t size_read_a) {
ASSERT_NO_ERROR (static_cast<std::error_code> (err_read_a));

View file

@ -1445,13 +1445,13 @@ TEST (bootstrap, tcp_node_id_handshake)
auto bootstrap_endpoint (system.nodes[0]->bootstrap.endpoint ());
auto cookie (system.nodes[0]->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (bootstrap_endpoint)));
nano::node_id_handshake node_id_handshake (cookie, boost::none);
auto input (node_id_handshake.to_bytes ());
auto input (node_id_handshake.to_shared_const_buffer ());
std::atomic<bool> write_done (false);
socket->async_connect (bootstrap_endpoint, [&input, socket, &write_done](boost::system::error_code const & ec) {
ASSERT_FALSE (ec);
socket->async_write (input, [&input, &write_done](boost::system::error_code const & ec, size_t size_a) {
ASSERT_FALSE (ec);
ASSERT_EQ (input->size (), size_a);
ASSERT_EQ (input.size (), size_a);
write_done = true;
});
});
@ -2283,12 +2283,12 @@ TEST (bootstrap, tcp_listener_timeout_node_id_handshake)
auto socket (std::make_shared<nano::socket> (node0));
auto cookie (node0->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (node0->bootstrap.endpoint ())));
nano::node_id_handshake node_id_handshake (cookie, boost::none);
auto input (node_id_handshake.to_bytes ());
auto input (node_id_handshake.to_shared_const_buffer ());
socket->async_connect (node0->bootstrap.endpoint (), [&input, socket](boost::system::error_code const & ec) {
ASSERT_FALSE (ec);
socket->async_write (input, [&input](boost::system::error_code const & ec, size_t size_a) {
ASSERT_FALSE (ec);
ASSERT_EQ (input->size (), size_a);
ASSERT_EQ (input.size (), size_a);
});
});
system.deadline_set (std::chrono::seconds (5));

View file

@ -449,10 +449,10 @@ TEST (node, connect_after_junk)
{
nano::system system (24000, 1);
auto node1 (std::make_shared<nano::node> (system.io_ctx, 24001, nano::unique_path (), system.alarm, system.logging, system.work));
auto junk_buffer (std::make_shared<std::vector<uint8_t>> ());
junk_buffer->push_back (0);
std::vector<uint8_t> junk_buffer;
junk_buffer.push_back (0);
auto channel1 (std::make_shared<nano::transport::channel_udp> (node1->network.udp_channels, system.nodes[0]->network.endpoint (), node1->network_params.protocol.protocol_version));
channel1->send_buffer (junk_buffer, nano::stat::detail::bulk_pull, [](boost::system::error_code const &, size_t) {});
channel1->send_buffer (nano::shared_const_buffer (std::move (junk_buffer)), nano::stat::detail::bulk_pull, [](boost::system::error_code const &, size_t) {});
system.deadline_set (10s);
while (system.nodes[0]->stats.count (nano::stat::type::error) == 0)
{
@ -1501,13 +1501,13 @@ TEST (node, fork_no_vote_quorum)
ASSERT_FALSE (system.wallet (1)->store.fetch (transaction, key1, key3));
auto vote (std::make_shared<nano::vote> (key1, key3, 0, send2));
nano::confirm_ack confirm (vote);
std::shared_ptr<std::vector<uint8_t>> bytes (new std::vector<uint8_t>);
std::vector<uint8_t> buffer;
{
nano::vectorstream stream (*bytes);
nano::vectorstream stream (buffer);
confirm.serialize (stream);
}
nano::transport::channel_udp channel (node2.network.udp_channels, node3.network.endpoint (), node1.network_params.protocol.protocol_version);
channel.send_buffer (bytes, nano::stat::detail::confirm_ack);
channel.send_buffer (nano::shared_const_buffer (std::move (buffer)), nano::stat::detail::confirm_ack);
while (node3.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::in) < 3)
{
ASSERT_NO_ERROR (system.poll ());

View file

@ -116,9 +116,9 @@ TEST (socket, concurrent_writes)
client_threads.emplace_back ([&client, &message_count]() {
for (int i = 0; i < message_count; i++)
{
auto buff (std::make_shared<std::vector<uint8_t>> ());
buff->push_back ('A' + i);
client->async_write (buff);
std::vector<uint8_t> buff;
buff.push_back ('A' + i);
client->async_write (nano::shared_const_buffer (std::move (buff)));
}
});
#ifndef _WIN32

View file

@ -14,6 +14,8 @@ add_library (nano_lib
${platform_sources}
alarm.hpp
alarm.cpp
asio.hpp
asio.cpp
blockbuilders.hpp
blockbuilders.cpp
blocks.hpp

45
nano/lib/asio.cpp Normal file
View file

@ -0,0 +1,45 @@
#include <nano/lib/asio.hpp>
nano::shared_const_buffer::shared_const_buffer (const std::vector<uint8_t> & data) :
m_data (std::make_shared<std::vector<uint8_t>> (data)),
m_buffer (boost::asio::buffer (*m_data))
{
}
nano::shared_const_buffer::shared_const_buffer (std::vector<uint8_t> && data) :
m_data (std::make_shared<std::vector<uint8_t>> (std::move (data))),
m_buffer (boost::asio::buffer (*m_data))
{
}
nano::shared_const_buffer::shared_const_buffer (uint8_t data) :
shared_const_buffer (std::vector<uint8_t>{ data })
{
}
nano::shared_const_buffer::shared_const_buffer (std::string const & data) :
m_data (std::make_shared<std::vector<uint8_t>> (data.begin (), data.end ())),
m_buffer (boost::asio::buffer (*m_data))
{
}
nano::shared_const_buffer::shared_const_buffer (std::shared_ptr<std::vector<uint8_t>> const & data) :
m_data (data),
m_buffer (boost::asio::buffer (*m_data))
{
}
const boost::asio::const_buffer * nano::shared_const_buffer::begin () const
{
return &m_buffer;
}
const boost::asio::const_buffer * nano::shared_const_buffer::end () const
{
return &m_buffer + 1;
}
size_t nano::shared_const_buffer::size () const
{
return m_buffer.size ();
}

37
nano/lib/asio.hpp Normal file
View file

@ -0,0 +1,37 @@
#pragma once
#include <nano/boost/asio.hpp>
namespace nano
{
class shared_const_buffer
{
public:
using value_type = boost::asio::const_buffer;
using const_iterator = const boost::asio::const_buffer *;
explicit shared_const_buffer (std::vector<uint8_t> const & data);
explicit shared_const_buffer (uint8_t data);
explicit shared_const_buffer (std::string const & data);
explicit shared_const_buffer (std::vector<uint8_t> && data);
explicit shared_const_buffer (std::shared_ptr<std::vector<uint8_t>> const & data);
const boost::asio::const_buffer * begin () const;
const boost::asio::const_buffer * end () const;
size_t size () const;
private:
std::shared_ptr<std::vector<uint8_t>> m_data;
boost::asio::const_buffer m_buffer;
};
static_assert (boost::asio::is_const_buffer_sequence<shared_const_buffer>::value, "Not ConstBufferSequence compliant");
template <typename AsyncWriteStream, typename WriteHandler>
BOOST_ASIO_INITFN_RESULT_TYPE (WriteHandler, void(boost::system::error_code, std::size_t))
async_write (AsyncWriteStream & s, nano::shared_const_buffer const & buffer, WriteHandler && handler)
{
return boost::asio::async_write (s, buffer, std::move (handler));
}
}

View file

@ -1,3 +1,4 @@
#include <nano/lib/asio.hpp>
#include <nano/lib/ipc.hpp>
#include <nano/lib/ipc_client.hpp>
@ -11,7 +12,7 @@ class channel
{
public:
virtual void async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, size_t size_a, std::function<void(boost::system::error_code const &, size_t)> callback_a) = 0;
virtual void async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(boost::system::error_code const &, size_t)> callback_a) = 0;
virtual void async_write (nano::shared_const_buffer const & buffer_a, std::function<void(boost::system::error_code const &, size_t)> callback_a) = 0;
};
/* Boost v1.70 introduced breaking changes; the conditional compilation allows 1.6x to be supported as well. */
@ -68,10 +69,10 @@ public:
}));
}
void async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(boost::system::error_code const &, size_t)> callback_a) override
void async_write (nano::shared_const_buffer const & buffer_a, std::function<void(boost::system::error_code const &, size_t)> callback_a) override
{
this->timer_start (io_timeout);
boost::asio::async_write (socket, boost::asio::buffer (buffer_a->data (), buffer_a->size ()), boost::asio::bind_executor (this->strand, [this, callback_a, buffer_a](boost::system::error_code const & ec, size_t size_a) {
nano::async_write (socket, buffer_a, boost::asio::bind_executor (this->strand, [this, callback_a](boost::system::error_code const & ec, size_t size_a) {
this->timer_cancel ();
callback_a (ec, size_a);
}));
@ -178,7 +179,7 @@ nano::error nano::ipc::ipc_client::connect (std::string const & host, uint16_t p
return result_l.get_future ().get ();
}
void nano::ipc::ipc_client::async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(nano::error, size_t)> callback_a)
void nano::ipc::ipc_client::async_write (nano::shared_const_buffer const & buffer_a, std::function<void(nano::error, size_t)> callback_a)
{
auto client (boost::polymorphic_downcast<client_impl *> (impl.get ()));
client->get_channel ().async_write (buffer_a, [callback_a](const boost::system::error_code & ec_a, size_t bytes_written_a) {
@ -194,23 +195,23 @@ void nano::ipc::ipc_client::async_read (std::shared_ptr<std::vector<uint8_t>> bu
});
}
std::shared_ptr<std::vector<uint8_t>> nano::ipc::prepare_request (nano::ipc::payload_encoding encoding_a, std::string const & payload_a)
nano::shared_const_buffer nano::ipc::prepare_request (nano::ipc::payload_encoding encoding_a, std::string const & payload_a)
{
auto buffer_l (std::make_shared<std::vector<uint8_t>> ());
std::vector<uint8_t> buffer_l;
if (encoding_a == nano::ipc::payload_encoding::json_legacy)
{
buffer_l->push_back ('N');
buffer_l->push_back (static_cast<uint8_t> (encoding_a));
buffer_l->push_back (0);
buffer_l->push_back (0);
buffer_l.push_back ('N');
buffer_l.push_back (static_cast<uint8_t> (encoding_a));
buffer_l.push_back (0);
buffer_l.push_back (0);
auto payload_length = static_cast<uint32_t> (payload_a.size ());
uint32_t be = boost::endian::native_to_big (payload_length);
char * chars = reinterpret_cast<char *> (&be);
buffer_l->insert (buffer_l->end (), chars, chars + sizeof (uint32_t));
buffer_l->insert (buffer_l->end (), payload_a.begin (), payload_a.end ());
buffer_l.insert (buffer_l.end (), chars, chars + sizeof (uint32_t));
buffer_l.insert (buffer_l.end (), payload_a.begin (), payload_a.end ());
}
return buffer_l;
return nano::shared_const_buffer{ std::move (buffer_l) };
}
std::string nano::ipc::request (nano::ipc::ipc_client & ipc_client, std::string const & rpc_action_a)

View file

@ -3,6 +3,7 @@
#include <nano/boost/asio.hpp>
#include <nano/lib/errors.hpp>
#include <nano/lib/ipc.hpp>
#include <nano/lib/utility.hpp>
#include <boost/property_tree/ptree.hpp>
@ -12,6 +13,7 @@
namespace nano
{
class shared_const_buffer;
namespace ipc
{
class ipc_client_impl
@ -38,7 +40,7 @@ namespace ipc
void async_connect (std::string const & host, uint16_t port, std::function<void(nano::error)> callback);
/** Write buffer asynchronously */
void async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(nano::error, size_t)> callback_a);
void async_write (nano::shared_const_buffer const & buffer_a, std::function<void(nano::error, size_t)> callback_a);
/** Read \p size_a bytes asynchronously */
void async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, size_t size_a, std::function<void(nano::error, size_t)> callback_a);
@ -57,6 +59,6 @@ namespace ipc
* Returns a buffer with an IPC preamble for the given \p encoding_a followed by the payload. Depending on encoding,
* the buffer may contain a payload length or end sentinel.
*/
std::shared_ptr<std::vector<uint8_t>> prepare_request (nano::ipc::payload_encoding encoding_a, std::string const & payload_a);
nano::shared_const_buffer prepare_request (nano::ipc::payload_encoding encoding_a, std::string const & payload_a);
}
}

View file

@ -221,6 +221,7 @@ inline std::unique_ptr<seq_con_info_component> collect_seq_con_info (observer_se
void remove_all_files_in_dir (boost::filesystem::path const & dir);
void move_all_files_to_dir (boost::filesystem::path const & from, boost::filesystem::path const & to);
}
// Have our own async_write which we must use?
void release_assert_internal (bool check, const char * check_expr, const char * file, unsigned int line);
#define release_assert(check) release_assert_internal (check, #check, __FILE__, __LINE__)

View file

@ -608,8 +608,7 @@ void nano::bulk_push_client::push (nano::transaction const & transaction_a)
void nano::bulk_push_client::send_finished ()
{
auto buffer (std::make_shared<std::vector<uint8_t>> ());
buffer->push_back (static_cast<uint8_t> (nano::block_type::not_a_block));
nano::shared_const_buffer buffer (static_cast<uint8_t> (nano::block_type::not_a_block));
auto this_l (shared_from_this ());
connection->channel->send_buffer (buffer, nano::stat::detail::all, [this_l](boost::system::error_code const & ec, size_t size_a) {
try
@ -624,13 +623,13 @@ void nano::bulk_push_client::send_finished ()
void nano::bulk_push_client::push_block (nano::block const & block_a)
{
auto buffer (std::make_shared<std::vector<uint8_t>> ());
std::vector<uint8_t> buffer;
{
nano::vectorstream stream (*buffer);
nano::vectorstream stream (buffer);
nano::serialize_block (stream, block_a);
}
auto this_l (shared_from_this ());
connection->channel->send_buffer (buffer, nano::stat::detail::all, [this_l](boost::system::error_code const & ec, size_t size_a) {
connection->channel->send_buffer (nano::shared_const_buffer (std::move (buffer)), nano::stat::detail::all, [this_l](boost::system::error_code const & ec, size_t size_a) {
if (!ec)
{
auto transaction (this_l->connection->node->store.tx_begin_read ());
@ -2423,9 +2422,9 @@ public:
assert (!nano::validate_message (response->first, *message_a.query, response->second));
auto cookie (connection->node->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (connection->remote_endpoint)));
nano::node_id_handshake response_message (cookie, response);
auto bytes = response_message.to_bytes ();
auto shared_const_buffer = response_message.to_shared_const_buffer ();
// clang-format off
connection->socket->async_write (bytes, [ bytes, connection = connection ](boost::system::error_code const & ec, size_t size_a) {
connection->socket->async_write (shared_const_buffer, [connection = connection ](boost::system::error_code const & ec, size_t size_a) {
if (ec)
{
if (connection->node->config.logging.network_node_id_handshake_logging ())
@ -2577,9 +2576,9 @@ void nano::bulk_pull_server::send_next ()
auto block (get_next ());
if (block != nullptr)
{
std::vector<uint8_t> send_buffer;
{
send_buffer->clear ();
nano::vectorstream stream (*send_buffer);
nano::vectorstream stream (send_buffer);
nano::serialize_block (stream, *block);
}
auto this_l (shared_from_this ());
@ -2587,7 +2586,7 @@ void nano::bulk_pull_server::send_next ()
{
connection->node->logger.try_log (boost::str (boost::format ("Sending block: %1%") % block->hash ().to_string ()));
}
connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) {
connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->sent_action (ec, size_a);
});
}
@ -2686,8 +2685,7 @@ void nano::bulk_pull_server::sent_action (boost::system::error_code const & ec,
void nano::bulk_pull_server::send_finished ()
{
send_buffer->clear ();
send_buffer->push_back (static_cast<uint8_t> (nano::block_type::not_a_block));
nano::shared_const_buffer send_buffer (static_cast<uint8_t> (nano::block_type::not_a_block));
auto this_l (shared_from_this ());
if (connection->node->config.logging.bulk_pull_logging ())
{
@ -2716,8 +2714,7 @@ void nano::bulk_pull_server::no_block_sent (boost::system::error_code const & ec
nano::bulk_pull_server::bulk_pull_server (std::shared_ptr<nano::bootstrap_server> const & connection_a, std::unique_ptr<nano::bulk_pull> request_a) :
connection (connection_a),
request (std::move (request_a)),
send_buffer (std::make_shared<std::vector<uint8_t>> ())
request (std::move (request_a))
{
set_current_end ();
}
@ -2787,17 +2784,16 @@ void nano::bulk_pull_account_server::send_frontier ()
nano::uint128_union account_frontier_balance (account_frontier_balance_int);
// Write the frontier block hash and balance into a buffer
send_buffer->clear ();
std::vector<uint8_t> send_buffer;
{
nano::vectorstream output_stream (*send_buffer);
nano::vectorstream output_stream (send_buffer);
write (output_stream, account_frontier_hash.bytes);
write (output_stream, account_frontier_balance.bytes);
}
// Send the buffer to the requestor
auto this_l (shared_from_this ());
connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) {
connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->sent_action (ec, size_a);
});
}
@ -2818,11 +2814,11 @@ void nano::bulk_pull_account_server::send_next_block ()
/*
* If we have a new item, emit it to the socket
*/
send_buffer->clear ();
std::vector<uint8_t> send_buffer;
if (pending_address_only)
{
nano::vectorstream output_stream (*send_buffer);
nano::vectorstream output_stream (send_buffer);
if (connection->node->config.logging.bulk_pull_logging ())
{
@ -2833,7 +2829,7 @@ void nano::bulk_pull_account_server::send_next_block ()
}
else
{
nano::vectorstream output_stream (*send_buffer);
nano::vectorstream output_stream (send_buffer);
if (connection->node->config.logging.bulk_pull_logging ())
{
@ -2853,7 +2849,7 @@ void nano::bulk_pull_account_server::send_next_block ()
}
auto this_l (shared_from_this ());
connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) {
connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->sent_action (ec, size_a);
});
}
@ -2975,10 +2971,9 @@ void nano::bulk_pull_account_server::send_finished ()
* "pending_include_address" flag is not set) or 640-bits of zeros
* (if that flag is set).
*/
send_buffer->clear ();
std::vector<uint8_t> send_buffer;
{
nano::vectorstream output_stream (*send_buffer);
nano::vectorstream output_stream (send_buffer);
nano::uint256_union account_zero (0);
nano::uint128_union balance_zero (0);
@ -3001,7 +2996,7 @@ void nano::bulk_pull_account_server::send_finished ()
connection->node->logger.try_log ("Bulk sending for an account finished");
}
connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) {
connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->complete (ec, size_a);
});
}
@ -3040,7 +3035,6 @@ void nano::bulk_pull_account_server::complete (boost::system::error_code const &
nano::bulk_pull_account_server::bulk_pull_account_server (std::shared_ptr<nano::bootstrap_server> const & connection_a, std::unique_ptr<nano::bulk_pull_account> request_a) :
connection (connection_a),
request (std::move (request_a)),
send_buffer (std::make_shared<std::vector<uint8_t>> ()),
current_key (0, 0)
{
/*
@ -3190,7 +3184,6 @@ connection (connection_a),
current (request_a->start.number () - 1),
frontier (0),
request (std::move (request_a)),
send_buffer (std::make_shared<std::vector<uint8_t>> ()),
count (0)
{
next ();
@ -3200,9 +3193,9 @@ void nano::frontier_req_server::send_next ()
{
if (!current.is_zero () && count < request->count)
{
std::vector<uint8_t> send_buffer;
{
send_buffer->clear ();
nano::vectorstream stream (*send_buffer);
nano::vectorstream stream (send_buffer);
write (stream, current.bytes);
write (stream, frontier.bytes);
}
@ -3212,7 +3205,7 @@ void nano::frontier_req_server::send_next ()
connection->node->logger.try_log (boost::str (boost::format ("Sending frontier for %1% %2%") % current.to_account () % frontier.to_string ()));
}
next ();
connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) {
connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->sent_action (ec, size_a);
});
}
@ -3224,9 +3217,9 @@ void nano::frontier_req_server::send_next ()
void nano::frontier_req_server::send_finished ()
{
std::vector<uint8_t> send_buffer;
{
send_buffer->clear ();
nano::vectorstream stream (*send_buffer);
nano::vectorstream stream (send_buffer);
nano::uint256_union zero (0);
write (stream, zero.bytes);
write (stream, zero.bytes);
@ -3236,7 +3229,7 @@ void nano::frontier_req_server::send_finished ()
{
connection->node->logger.try_log ("Frontier sending finished");
}
connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) {
connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->no_block_sent (ec, size_a);
});
}

View file

@ -344,7 +344,6 @@ public:
void no_block_sent (boost::system::error_code const &, size_t);
std::shared_ptr<nano::bootstrap_server> connection;
std::unique_ptr<nano::bulk_pull> request;
std::shared_ptr<std::vector<uint8_t>> send_buffer;
nano::block_hash current;
bool include_start;
nano::bulk_pull::count_t max_count;
@ -364,7 +363,6 @@ public:
void complete (boost::system::error_code const &, size_t);
std::shared_ptr<nano::bootstrap_server> connection;
std::unique_ptr<nano::bulk_pull_account> request;
std::shared_ptr<std::vector<uint8_t>> send_buffer;
std::unordered_set<nano::uint256_union> deduplication;
nano::pending_key current_key;
bool pending_address_only;
@ -396,7 +394,6 @@ public:
nano::account current;
nano::block_hash frontier;
std::unique_ptr<nano::frontier_req> request;
std::shared_ptr<std::vector<uint8_t>> send_buffer;
size_t count;
std::deque<std::pair<nano::account, nano::block_hash>> accounts;
};

View file

@ -2,6 +2,7 @@
#include <nano/boost/asio.hpp>
#include <nano/crypto_lib/random_pool.hpp>
#include <nano/lib/asio.hpp>
#include <nano/lib/config.hpp>
#include <nano/lib/memory.hpp>
#include <nano/secure/common.hpp>
@ -233,13 +234,17 @@ public:
virtual ~message () = default;
virtual void serialize (nano::stream &) const = 0;
virtual void visit (nano::message_visitor &) const = 0;
virtual std::shared_ptr<std::vector<uint8_t>> to_bytes () const
std::shared_ptr<std::vector<uint8_t>> to_bytes () const
{
auto bytes = std::make_shared<std::vector<uint8_t>> ();
nano::vectorstream stream (*bytes);
serialize (stream);
return bytes;
}
nano::shared_const_buffer to_shared_const_buffer () const
{
return shared_const_buffer (to_bytes ());
}
nano::message_header header;
};
class work_pool;

View file

@ -95,20 +95,17 @@ public:
// json and write the response to the ipc socket with a length prefix.
auto this_l (this->shared_from_this ());
auto response_handler_l ([this_l, request_id_l](std::string const & body) {
this_l->response_body = body;
this_l->size_response = boost::endian::native_to_big (static_cast<uint32_t> (this_l->response_body.size ()));
std::vector<boost::asio::mutable_buffer> bufs = {
boost::asio::buffer (&this_l->size_response, sizeof (this_l->size_response)),
boost::asio::buffer (this_l->response_body)
};
auto big = boost::endian::native_to_big (static_cast<uint32_t> (body.size ()));
std::vector<uint8_t> buffer;
buffer.insert (buffer.end (), reinterpret_cast<std::uint8_t *> (&big), reinterpret_cast<std::uint8_t *> (&big) + sizeof (std::uint32_t));
buffer.insert (buffer.end (), body.begin (), body.end ());
if (this_l->node.config.logging.log_ipc ())
{
this_l->node.logger.always_log (boost::str (boost::format ("IPC/RPC request %1% completed in: %2% %3%") % request_id_l % this_l->session_timer.stop ().count () % this_l->session_timer.unit ()));
}
this_l->timer_start (std::chrono::seconds (this_l->config_transport.io_timeout));
boost::asio::async_write (this_l->socket, bufs, [this_l](boost::system::error_code const & error_a, size_t size_a) {
nano::async_write (this_l->socket, nano::shared_const_buffer (buffer), [this_l](boost::system::error_code const & error_a, size_t size_a) {
this_l->timer_cancel ();
if (!error_a)
{
@ -201,10 +198,6 @@ private:
/** Buffer sizes are read into this */
uint32_t buffer_size{ 0 };
/** RPC response */
std::string response_body;
uint32_t size_response{ 0 };
/** Buffer used to store data received from the client */
std::vector<uint8_t> buffer;

View file

@ -59,7 +59,7 @@ void nano::socket::async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, s
}
}
void nano::socket::async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(boost::system::error_code const &, size_t)> callback_a)
void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function<void(boost::system::error_code const &, size_t)> callback_a)
{
auto this_l (shared_from_this ());
if (!closed)
@ -86,9 +86,9 @@ void nano::socket::async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a,
else
{
start_timer ();
boost::asio::async_write (tcp_socket, boost::asio::buffer (buffer_a->data (), buffer_a->size ()),
nano::async_write (tcp_socket, buffer_a,
boost::asio::bind_executor (strand,
[this_l, buffer_a, callback_a](boost::system::error_code const & ec, size_t size_a) {
[this_l, callback_a](boost::system::error_code const & ec, size_t size_a) {
if (auto node = this_l->node.lock ())
{
node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a);
@ -110,7 +110,7 @@ void nano::socket::write_queued_messages ()
std::weak_ptr<nano::socket> this_w (shared_from_this ());
auto msg (send_queue.front ());
start_timer ();
boost::asio::async_write (tcp_socket, boost::asio::buffer (msg.buffer->data (), msg.buffer->size ()),
nano::async_write (tcp_socket, msg.buffer,
boost::asio::bind_executor (strand,
[msg, this_w](boost::system::error_code ec, std::size_t size_a) {
if (auto this_l = this_w.lock ())

View file

@ -1,6 +1,8 @@
#pragma once
#include <nano/boost/asio.hpp>
#include <nano/lib/asio.hpp>
#include <nano/lib/utility.hpp>
#include <boost/optional.hpp>
@ -43,7 +45,7 @@ public:
virtual ~socket ();
void async_connect (boost::asio::ip::tcp::endpoint const &, std::function<void(boost::system::error_code const &)>);
void async_read (std::shared_ptr<std::vector<uint8_t>>, size_t, std::function<void(boost::system::error_code const &, size_t)>);
void async_write (std::shared_ptr<std::vector<uint8_t>>, std::function<void(boost::system::error_code const &, size_t)> = nullptr);
void async_write (nano::shared_const_buffer const &, std::function<void(boost::system::error_code const &, size_t)> = nullptr);
void close ();
boost::asio::ip::tcp::endpoint remote_endpoint () const;
@ -60,7 +62,7 @@ protected:
class queue_item
{
public:
std::shared_ptr<std::vector<uint8_t>> buffer;
nano::shared_const_buffer buffer;
std::function<void(boost::system::error_code const &, size_t)> callback;
};

View file

@ -41,20 +41,20 @@ bool nano::transport::channel_tcp::operator== (nano::transport::channel const &
return result;
}
void nano::transport::channel_tcp::send_buffer (std::shared_ptr<std::vector<uint8_t>> buffer_a, nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a)
void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const & buffer_a, nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a)
{
socket->async_write (buffer_a, tcp_callback (buffer_a, detail_a, socket->remote_endpoint (), callback_a));
socket->async_write (buffer_a, tcp_callback (detail_a, socket->remote_endpoint (), callback_a));
}
std::function<void(boost::system::error_code const &, size_t)> nano::transport::channel_tcp::callback (std::shared_ptr<std::vector<uint8_t>> buffer_a, nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
std::function<void(boost::system::error_code const &, size_t)> nano::transport::channel_tcp::callback (nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
{
return callback_a;
}
std::function<void(boost::system::error_code const &, size_t)> nano::transport::channel_tcp::tcp_callback (std::shared_ptr<std::vector<uint8_t>> buffer_a, nano::stat::detail detail_a, nano::tcp_endpoint const & endpoint_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
std::function<void(boost::system::error_code const &, size_t)> nano::transport::channel_tcp::tcp_callback (nano::stat::detail detail_a, nano::tcp_endpoint const & endpoint_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
{
// clang-format off
return [ buffer_a, endpoint_a, node = std::weak_ptr<nano::node> (node.shared ()), callback_a ](boost::system::error_code const & ec, size_t size_a)
return [endpoint_a, node = std::weak_ptr<nano::node> (node.shared ()), callback_a ](boost::system::error_code const & ec, size_t size_a)
{
if (auto node_l = node.lock ())
{
@ -489,7 +489,7 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a
// TCP node ID handshake
auto cookie (node_l->network.syn_cookies.assign (endpoint_a));
nano::node_id_handshake message (cookie, boost::none);
auto bytes = message.to_bytes ();
auto bytes = message.to_shared_const_buffer ();
if (node_l->config.logging.network_node_id_handshake_logging ())
{
node_l->logger.try_log (boost::str (boost::format ("Node ID handshake request sent with node ID %1% to %2%: query %3%") % node_l->node_id.pub.to_node_id () % endpoint_a % (*cookie).to_string ()));
@ -558,7 +558,7 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
channel_a->set_last_packet_received (std::chrono::steady_clock::now ());
boost::optional<std::pair<nano::account, nano::signature>> response (std::make_pair (node_l->node_id.pub, nano::sign_message (node_l->node_id.prv, node_l->node_id.pub, *message.query)));
nano::node_id_handshake response_message (boost::none, response);
auto bytes = response_message.to_bytes ();
auto bytes = response_message.to_shared_const_buffer ();
if (node_l->config.logging.network_node_id_handshake_logging ())
{
node_l->logger.try_log (boost::str (boost::format ("Node ID handshake response sent with node ID %1% to %2%: query %3%") % node_l->node_id.pub.to_node_id () % endpoint_a % (*message.query).to_string ()));

View file

@ -27,9 +27,9 @@ namespace transport
~channel_tcp ();
size_t hash_code () const override;
bool operator== (nano::transport::channel const &) const override;
void send_buffer (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) override;
std::function<void(boost::system::error_code const &, size_t)> callback (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const override;
std::function<void(boost::system::error_code const &, size_t)> tcp_callback (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, nano::tcp_endpoint const &, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const;
void send_buffer (nano::shared_const_buffer const &, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) override;
std::function<void(boost::system::error_code const &, size_t)> callback (nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const override;
std::function<void(boost::system::error_code const &, size_t)> tcp_callback (nano::stat::detail, nano::tcp_endpoint const &, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const;
std::string to_string () const override;
bool operator== (nano::transport::channel_tcp const & other_a) const
{

View file

@ -80,9 +80,9 @@ void nano::transport::channel::send (nano::message const & message_a, std::funct
{
callback_visitor visitor;
message_a.visit (visitor);
auto buffer (message_a.to_bytes ());
auto buffer (message_a.to_shared_const_buffer ());
auto detail (visitor.result);
if (!is_droppable_a || !limiter.should_drop (buffer->size ()))
if (!is_droppable_a || !limiter.should_drop (buffer.size ()))
{
send_buffer (buffer, detail, callback_a);
node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out);
@ -93,7 +93,7 @@ void nano::transport::channel::send (nano::message const & message_a, std::funct
if (node.config.logging.network_packet_logging ())
{
auto key = static_cast<uint8_t> (detail) << 8;
node.logger.always_log (boost::str (boost::format ("%1% of size %2% dropped") % node.stats.detail_to_string (key) % buffer->size ()));
node.logger.always_log (boost::str (boost::format ("%1% of size %2% dropped") % node.stats.detail_to_string (key) % buffer.size ()));
}
}
}

View file

@ -54,8 +54,8 @@ namespace transport
virtual size_t hash_code () const = 0;
virtual bool operator== (nano::transport::channel const &) const = 0;
void send (nano::message const &, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr, bool const = true);
virtual void send_buffer (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) = 0;
virtual std::function<void(boost::system::error_code const &, size_t)> callback (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const = 0;
virtual void send_buffer (nano::shared_const_buffer const &, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) = 0;
virtual std::function<void(boost::system::error_code const &, size_t)> callback (nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const = 0;
virtual std::string to_string () const = 0;
virtual nano::endpoint get_endpoint () const = 0;
virtual nano::tcp_endpoint get_tcp_endpoint () const = 0;

View file

@ -29,16 +29,16 @@ bool nano::transport::channel_udp::operator== (nano::transport::channel const &
return result;
}
void nano::transport::channel_udp::send_buffer (std::shared_ptr<std::vector<uint8_t>> buffer_a, nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a)
void nano::transport::channel_udp::send_buffer (nano::shared_const_buffer const & buffer_a, nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a)
{
set_last_packet_sent (std::chrono::steady_clock::now ());
channels.send (boost::asio::const_buffer (buffer_a->data (), buffer_a->size ()), endpoint, callback (buffer_a, detail_a, callback_a));
channels.send (buffer_a, endpoint, callback (detail_a, callback_a));
}
std::function<void(boost::system::error_code const &, size_t)> nano::transport::channel_udp::callback (std::shared_ptr<std::vector<uint8_t>> buffer_a, nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
std::function<void(boost::system::error_code const &, size_t)> nano::transport::channel_udp::callback (nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
{
// clang-format off
return [ buffer_a, node = std::weak_ptr<nano::node> (channels.node.shared ()), callback_a ](boost::system::error_code const & ec, size_t size_a)
return [node = std::weak_ptr<nano::node> (channels.node.shared ()), callback_a ](boost::system::error_code const & ec, size_t size_a)
{
if (auto node_l = node.lock ())
{
@ -80,7 +80,7 @@ socket (node_a.io_ctx, nano::endpoint (boost::asio::ip::address_v6::any (), port
local_endpoint = nano::endpoint (boost::asio::ip::address_v6::loopback (), port);
}
void nano::transport::udp_channels::send (boost::asio::const_buffer buffer_a, nano::endpoint endpoint_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a)
void nano::transport::udp_channels::send (nano::shared_const_buffer const & buffer_a, nano::endpoint endpoint_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a)
{
boost::asio::post (strand,
[this, buffer_a, endpoint_a, callback_a]() {

View file

@ -27,8 +27,8 @@ namespace transport
channel_udp (nano::transport::udp_channels &, nano::endpoint const &, uint8_t protocol_version);
size_t hash_code () const override;
bool operator== (nano::transport::channel const &) const override;
void send_buffer (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) override;
std::function<void(boost::system::error_code const &, size_t)> callback (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const override;
void send_buffer (nano::shared_const_buffer const &, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) override;
std::function<void(boost::system::error_code const &, size_t)> callback (nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const override;
std::string to_string () const override;
bool operator== (nano::transport::channel_udp const & other_a) const
{
@ -77,7 +77,7 @@ namespace transport
void receive ();
void start ();
void stop ();
void send (boost::asio::const_buffer buffer_a, nano::endpoint endpoint_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a);
void send (nano::shared_const_buffer const & buffer_a, nano::endpoint endpoint_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a);
nano::endpoint get_local_endpoint () const;
void receive_action (nano::message_buffer *);
void process_packets ();

View file

@ -244,14 +244,13 @@ void nano::websocket::session::write (nano::websocket::message message_a)
void nano::websocket::session::write_queued_messages ()
{
auto msg (send_queue.front ());
auto msg_str (msg.to_string ());
auto msg (send_queue.front ().to_string ());
auto this_l (shared_from_this ());
// clang-format off
ws.async_write (boost::asio::buffer (msg_str->data (), msg_str->size ()),
ws.async_write (nano::shared_const_buffer (msg),
boost::asio::bind_executor (strand,
[msg_str, this_l](boost::system::error_code ec, std::size_t bytes_transferred) {
[this_l](boost::system::error_code ec, std::size_t bytes_transferred) {
this_l->send_queue.pop_front ();
if (!ec)
{
@ -673,10 +672,10 @@ void nano::websocket::message_builder::set_common_fields (nano::websocket::messa
message_a.contents.add ("time", std::to_string (milli_since_epoch));
}
std::shared_ptr<std::string> nano::websocket::message::to_string () const
std::string nano::websocket::message::to_string () const
{
std::ostringstream ostream;
boost::property_tree::write_json (ostream, contents);
ostream.flush ();
return std::make_shared<std::string> (ostream.str ());
return ostream.str ();
}

View file

@ -70,7 +70,7 @@ namespace websocket
{
}
std::shared_ptr<std::string> to_string () const;
std::string to_string () const;
nano::websocket::topic topic;
boost::property_tree::ptree contents;
};

View file

@ -48,7 +48,6 @@ void nano::rpc_connection::write_result (std::string body, unsigned version, boo
else
{
assert (false && "RPC already responded and should only respond once");
// Guards `res' from being clobbered while async_write is being serviced
}
}

View file

@ -1,3 +1,4 @@
#include <nano/lib/asio.hpp>
#include <nano/lib/json_error_response.hpp>
#include <nano/rpc/rpc_request_processor.hpp>
@ -83,7 +84,7 @@ void nano::rpc_request_processor::make_available (nano::ipc_connection & connect
}
// Connection does not exist or has been closed, try to connect to it again and then resend IPC request
void nano::rpc_request_processor::try_reconnect_and_execute_request (std::shared_ptr<nano::ipc_connection> connection, std::shared_ptr<std::vector<uint8_t>> req, std::shared_ptr<std::vector<uint8_t>> res, std::shared_ptr<nano::rpc_request> rpc_request)
void nano::rpc_request_processor::try_reconnect_and_execute_request (std::shared_ptr<nano::ipc_connection> connection, nano::shared_const_buffer const & req, std::shared_ptr<std::vector<uint8_t>> res, std::shared_ptr<nano::rpc_request> rpc_request)
{
connection->client.async_connect (ipc_address, ipc_port, [this, connection, req, res, rpc_request](nano::error err) {
if (!err)

View file

@ -48,7 +48,7 @@ public:
private:
void run ();
void read_payload (std::shared_ptr<nano::ipc_connection> connection, std::shared_ptr<std::vector<uint8_t>> res, std::shared_ptr<nano::rpc_request> rpc_request);
void try_reconnect_and_execute_request (std::shared_ptr<nano::ipc_connection> connection, std::shared_ptr<std::vector<uint8_t>> req, std::shared_ptr<std::vector<uint8_t>> res, std::shared_ptr<nano::rpc_request> rpc_request);
void try_reconnect_and_execute_request (std::shared_ptr<nano::ipc_connection> connection, nano::shared_const_buffer const & req, std::shared_ptr<std::vector<uint8_t>> res, std::shared_ptr<nano::rpc_request> rpc_request);
void make_available (nano::ipc_connection & connection);
std::vector<std::shared_ptr<nano::ipc_connection>> connections;