Invoke callbacks even during socket closures (#2828)

* Invoke callbacks even during socket closures

* Gui comments
This commit is contained in:
Wesley Shillingford 2020-07-08 15:13:04 +01:00 committed by GitHub
commit 69bd746b17
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 71 additions and 21 deletions

View file

@ -72,7 +72,7 @@ void nano::socket::async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, s
}
}
void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function<void(boost::system::error_code const &, size_t)> callback_a, nano::buffer_drop_policy drop_policy_a)
void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a, nano::buffer_drop_policy drop_policy_a)
{
auto this_l (shared_from_this ());
if (!closed)
@ -80,32 +80,42 @@ void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std:
if (writer_concurrency == nano::socket::concurrency::multi_writer)
{
boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback_a, this_l, drop_policy_a]() {
bool write_in_progress = !this_l->send_queue.empty ();
auto queue_size = this_l->send_queue.size ();
if (queue_size < this_l->queue_size_max || (drop_policy_a == nano::buffer_drop_policy::no_socket_drop && queue_size < (this_l->queue_size_max * 2)))
if (!this_l->closed)
{
this_l->send_queue.emplace_back (nano::socket::queue_item{ buffer_a, callback_a });
}
else if (auto node_l = this_l->node.lock ())
{
if (drop_policy_a == nano::buffer_drop_policy::no_socket_drop)
bool write_in_progress = !this_l->send_queue.empty ();
auto queue_size = this_l->send_queue.size ();
if (queue_size < this_l->queue_size_max || (drop_policy_a == nano::buffer_drop_policy::no_socket_drop && queue_size < (this_l->queue_size_max * 2)))
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out);
this_l->send_queue.emplace_back (nano::socket::queue_item{ buffer_a, callback_a });
}
else
else if (auto node_l = this_l->node.lock ())
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out);
}
if (drop_policy_a == nano::buffer_drop_policy::no_socket_drop)
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out);
}
else
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out);
}
if (callback_a)
{
callback_a (boost::system::errc::make_error_code (boost::system::errc::no_buffer_space), 0);
}
}
if (!write_in_progress)
{
this_l->write_queued_messages ();
}
}
else
{
if (callback_a)
{
callback_a (boost::system::errc::make_error_code (boost::system::errc::no_buffer_space), 0);
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
}
}
if (!write_in_progress)
{
this_l->write_queued_messages ();
}
}));
}
else
@ -126,6 +136,15 @@ void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std:
}));
}
}
else if (callback_a)
{
if (auto node = this_l->node.lock ())
{
node->background ([callback_a]() {
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
}
}
void nano::socket::write_queued_messages ()
@ -164,6 +183,10 @@ void nano::socket::write_queued_messages ()
this_l->start_timer (node->network_params.node.idle_timeout);
}
}
else if (msg.callback)
{
msg.callback (ec, size_a);
}
}
}
}));
@ -245,6 +268,24 @@ void nano::socket::close ()
}));
}
void nano::socket::flush_send_queue_callbacks ()
{
while (!send_queue.empty ())
{
auto & item = send_queue.front ();
if (item.callback)
{
if (auto node_l = node.lock ())
{
node_l->background ([callback = std::move (item.callback)]() {
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
}
send_queue.pop_front ();
}
}
// This must be called from a strand or the destructor
void nano::socket::close_internal ()
{
@ -256,7 +297,7 @@ void nano::socket::close_internal ()
// Ignore error code for shutdown as it is best-effort
tcp_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec);
tcp_socket.close (ec);
send_queue.clear ();
flush_send_queue_callbacks ();
if (ec)
{
if (auto node_l = node.lock ())

View file

@ -55,7 +55,7 @@ public:
virtual ~socket ();
void async_connect (boost::asio::ip::tcp::endpoint const &, std::function<void(boost::system::error_code const &)>);
void async_read (std::shared_ptr<std::vector<uint8_t>>, size_t, std::function<void(boost::system::error_code const &, size_t)>);
void async_write (nano::shared_const_buffer const &, std::function<void(boost::system::error_code const &, size_t)> = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter);
void async_write (nano::shared_const_buffer const &, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter);
void close ();
boost::asio::ip::tcp::endpoint remote_endpoint () const;
@ -102,6 +102,7 @@ protected:
void start_timer ();
void stop_timer ();
void checkup ();
void flush_send_queue_callbacks ();
};
/** Socket class for TCP servers */

View file

@ -56,6 +56,12 @@ void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const
{
socket_l->async_write (buffer_a, tcp_callback (detail_a, socket_l->remote_endpoint (), callback_a), drop_policy_a);
}
else if (callback_a)
{
node.background ([callback_a]() {
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
}
std::function<void(boost::system::error_code const &, size_t)> nano::transport::channel_tcp::callback (nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const

View file

@ -102,7 +102,9 @@ void nano::transport::channel::send (nano::message const & message_a, std::funct
{
if (callback_a)
{
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
node.background ([callback_a]() {
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
node.stats.inc (nano::stat::type::drop, detail, nano::stat::dir::out);