ws close and async_read should go through the strand to be thread safe (#1935)

This commit is contained in:
Guilherme Lawless 2019-04-26 14:20:38 +01:00 committed by cryptocode
commit 8b0bc71b73
2 changed files with 20 additions and 12 deletions

View file

@ -98,7 +98,7 @@ bool nano::websocket::vote_options::should_filter (nano::websocket::message cons
}
nano::websocket::session::session (nano::websocket::listener & listener_a, socket_type socket_a) :
ws_listener (listener_a), ws (std::move (socket_a)), write_strand (ws.get_executor ())
ws_listener (listener_a), ws (std::move (socket_a)), strand (ws.get_executor ())
{
ws.text (true);
ws_listener.get_node ().logger.try_log ("Websocket: session started");
@ -135,12 +135,17 @@ void nano::websocket::session::handshake ()
void nano::websocket::session::close ()
{
// clang-format off
std::lock_guard<std::mutex> lk (io_mutex);
boost::beast::websocket::close_reason reason;
reason.code = boost::beast::websocket::close_code::normal;
reason.reason = "Shutting down";
boost::system::error_code ec_ignore;
ws.close (reason, ec_ignore);
boost::asio::post (strand,
[self_l = shared_from_this ()]() {
boost::beast::websocket::close_reason reason;
reason.code = boost::beast::websocket::close_code::normal;
reason.reason = "Shutting down";
boost::system::error_code ec_ignore;
self_l->ws.close (reason, ec_ignore);
});
// clang-format on
}
void nano::websocket::session::write (nano::websocket::message message_a)
@ -151,7 +156,7 @@ void nano::websocket::session::write (nano::websocket::message message_a)
if (message_a.topic == nano::websocket::topic::ack || (subscription != subscriptions.end () && !subscription->second->should_filter (message_a)))
{
lk.unlock ();
boost::asio::post (write_strand,
boost::asio::post (strand,
[message_a, self_l = shared_from_this ()]() {
bool write_in_progress = !self_l->send_queue.empty ();
self_l->send_queue.emplace_back (message_a);
@ -172,7 +177,7 @@ void nano::websocket::session::write_queued_messages ()
std::lock_guard<std::mutex> lk (io_mutex);
ws.async_write (boost::asio::buffer (msg_str.data (), msg_str.size ()),
boost::asio::bind_executor (write_strand,
boost::asio::bind_executor (strand,
[msg, self_l = shared_from_this ()](boost::system::error_code ec, std::size_t bytes_transferred) {
self_l->send_queue.pop_front ();
if (!ec)
@ -188,8 +193,10 @@ void nano::websocket::session::write_queued_messages ()
void nano::websocket::session::read ()
{
// clang-format off
std::lock_guard<std::mutex> lk (io_mutex);
ws.async_read (read_buffer,
boost::asio::bind_executor (strand,
[self_l = shared_from_this ()](boost::system::error_code ec, std::size_t bytes_transferred) {
if (!ec)
{
@ -216,7 +223,8 @@ void nano::websocket::session::read ()
{
self_l->ws_listener.get_node ().logger.try_log ("Websocket: read failed: ", ec.message ());
}
});
}));
// clang-format on
}
namespace

View file

@ -176,9 +176,9 @@ namespace websocket
boost::beast::websocket::stream<socket_type> ws;
/** Buffer for received messages */
boost::beast::multi_buffer read_buffer;
/** All websocket writes and updates to send_queue must go through the write strand. */
boost::asio::strand<boost::asio::io_context::executor_type> write_strand;
/** Outgoing messages. The send queue is protected by accessing it only through the write strand */
/** All websocket operations that are thread unsafe must go through a strand. */
boost::asio::strand<boost::asio::io_context::executor_type> strand;
/** Outgoing messages. The send queue is protected by accessing it only through the strand */
std::deque<message> send_queue;
/** Serialize calls to websocket::stream initiating functions */
std::mutex io_mutex;