From 69bd746b17b62f5fc2cf36d9450769b0cf367c9b Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Wed, 8 Jul 2020 15:13:04 +0100 Subject: [PATCH] Invoke callbacks even during socket closures (#2828) * Invoke callbacks even during socket closures * Gui comments --- nano/node/socket.cpp | 79 +++++++++++++++++++++++-------- nano/node/socket.hpp | 3 +- nano/node/transport/tcp.cpp | 6 +++ nano/node/transport/transport.cpp | 4 +- 4 files changed, 71 insertions(+), 21 deletions(-) diff --git a/nano/node/socket.cpp b/nano/node/socket.cpp index 2dd958c1..4027da91 100644 --- a/nano/node/socket.cpp +++ b/nano/node/socket.cpp @@ -72,7 +72,7 @@ void nano::socket::async_read (std::shared_ptr> buffer_a, s } } -void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function callback_a, nano::buffer_drop_policy drop_policy_a) +void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::buffer_drop_policy drop_policy_a) { auto this_l (shared_from_this ()); if (!closed) @@ -80,32 +80,42 @@ void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std: if (writer_concurrency == nano::socket::concurrency::multi_writer) { boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback_a, this_l, drop_policy_a]() { - bool write_in_progress = !this_l->send_queue.empty (); - auto queue_size = this_l->send_queue.size (); - if (queue_size < this_l->queue_size_max || (drop_policy_a == nano::buffer_drop_policy::no_socket_drop && queue_size < (this_l->queue_size_max * 2))) + if (!this_l->closed) { - this_l->send_queue.emplace_back (nano::socket::queue_item{ buffer_a, callback_a }); - } - else if (auto node_l = this_l->node.lock ()) - { - if (drop_policy_a == nano::buffer_drop_policy::no_socket_drop) + bool write_in_progress = !this_l->send_queue.empty (); + auto queue_size = this_l->send_queue.size (); + if (queue_size < this_l->queue_size_max || (drop_policy_a == nano::buffer_drop_policy::no_socket_drop && queue_size < (this_l->queue_size_max * 2))) { - node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out); + this_l->send_queue.emplace_back (nano::socket::queue_item{ buffer_a, callback_a }); } - else + else if (auto node_l = this_l->node.lock ()) { - node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out); - } + if (drop_policy_a == nano::buffer_drop_policy::no_socket_drop) + { + node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out); + } + else + { + node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out); + } + if (callback_a) + { + callback_a (boost::system::errc::make_error_code (boost::system::errc::no_buffer_space), 0); + } + } + if (!write_in_progress) + { + this_l->write_queued_messages (); + } + } + else + { if (callback_a) { - callback_a (boost::system::errc::make_error_code (boost::system::errc::no_buffer_space), 0); + callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); } } - if (!write_in_progress) - { - this_l->write_queued_messages (); - } })); } else @@ -126,6 +136,15 @@ void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std: })); } } + else if (callback_a) + { + if (auto node = this_l->node.lock ()) + { + node->background ([callback_a]() { + callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); + }); + } + } } void nano::socket::write_queued_messages () @@ -164,6 +183,10 @@ void nano::socket::write_queued_messages () this_l->start_timer (node->network_params.node.idle_timeout); } } + else if (msg.callback) + { + msg.callback (ec, size_a); + } } } })); @@ -245,6 +268,24 @@ void nano::socket::close () })); } +void nano::socket::flush_send_queue_callbacks () +{ + while (!send_queue.empty ()) + { + auto & item = send_queue.front (); + if (item.callback) + { + if (auto node_l = node.lock ()) + { + node_l->background ([callback = std::move (item.callback)]() { + callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); + }); + } + } + send_queue.pop_front (); + } +} + // This must be called from a strand or the destructor void nano::socket::close_internal () { @@ -256,7 +297,7 @@ void nano::socket::close_internal () // Ignore error code for shutdown as it is best-effort tcp_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec); tcp_socket.close (ec); - send_queue.clear (); + flush_send_queue_callbacks (); if (ec) { if (auto node_l = node.lock ()) diff --git a/nano/node/socket.hpp b/nano/node/socket.hpp index 14a56b72..399f9373 100644 --- a/nano/node/socket.hpp +++ b/nano/node/socket.hpp @@ -55,7 +55,7 @@ public: virtual ~socket (); void async_connect (boost::asio::ip::tcp::endpoint const &, std::function); void async_read (std::shared_ptr>, size_t, std::function); - void async_write (nano::shared_const_buffer const &, std::function = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter); + void async_write (nano::shared_const_buffer const &, std::function const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter); void close (); boost::asio::ip::tcp::endpoint remote_endpoint () const; @@ -102,6 +102,7 @@ protected: void start_timer (); void stop_timer (); void checkup (); + void flush_send_queue_callbacks (); }; /** Socket class for TCP servers */ diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 3d2fe642..3ab9e4fe 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -56,6 +56,12 @@ void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const { socket_l->async_write (buffer_a, tcp_callback (detail_a, socket_l->remote_endpoint (), callback_a), drop_policy_a); } + else if (callback_a) + { + node.background ([callback_a]() { + callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); + }); + } } std::function nano::transport::channel_tcp::callback (nano::stat::detail detail_a, std::function const & callback_a) const diff --git a/nano/node/transport/transport.cpp b/nano/node/transport/transport.cpp index 3b5c7e2b..26bb8d9d 100644 --- a/nano/node/transport/transport.cpp +++ b/nano/node/transport/transport.cpp @@ -102,7 +102,9 @@ void nano::transport::channel::send (nano::message const & message_a, std::funct { if (callback_a) { - callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); + node.background ([callback_a]() { + callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); + }); } node.stats.inc (nano::stat::type::drop, detail, nano::stat::dir::out);