From fb093ff1896990342448990b2446bdf0de73a901 Mon Sep 17 00:00:00 2001 From: cryptocode Date: Tue, 21 May 2019 21:43:49 +0200 Subject: [PATCH] WebSocket fixes and improvements (#2007) --- nano/core_test/websocket.cpp | 22 +++---- nano/node/websocket.cpp | 114 ++++++++++++++++++----------------- nano/node/websocket.hpp | 4 +- 3 files changed, 72 insertions(+), 68 deletions(-) diff --git a/nano/core_test/websocket.cpp b/nano/core_test/websocket.cpp index c0e1ecfe5..aa2497f4d 100644 --- a/nano/core_test/websocket.cpp +++ b/nano/core_test/websocket.cpp @@ -55,12 +55,12 @@ boost::optional websocket_test_call (std::string host, std::string if (await_response) { assert (response_deadline > 0s); - boost::beast::flat_buffer buffer; - ws.async_read (buffer, [&ret, &buffer](boost::beast::error_code const & ec, std::size_t const n) { + auto buffer (std::make_shared ()); + ws.async_read (*buffer, [&ret, buffer](boost::beast::error_code const & ec, std::size_t const n) { if (!ec) { std::ostringstream res; - res << beast_buffers (buffer.data ()); + res << beast_buffers (buffer->data ()); ret = res.str (); } }); @@ -109,7 +109,6 @@ TEST (websocket, confirmation) ASSERT_EQ (event.get ("topic"), "confirmation"); confirmation_event_received = true; }); - client_thread.detach (); // Wait for the subscription to be acknowledged system.deadline_set (5s); @@ -140,6 +139,7 @@ TEST (websocket, confirmation) ASSERT_NO_ERROR (system.poll ()); } ack_ready = false; + client_thread.join (); std::atomic unsubscribe_ack_received{ false }; std::thread client_thread_2 ([&unsubscribe_ack_received]() { @@ -157,7 +157,6 @@ TEST (websocket, confirmation) R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json", true, true, 1s); unsubscribe_ack_received = true; }); - client_thread_2.detach (); // Wait for the subscription to be acknowledged system.deadline_set (5s); @@ -182,6 +181,7 @@ TEST (websocket, confirmation) ASSERT_NO_ERROR (system.poll ()); } ack_ready = false; + client_thread_2.join (); node1->stop (); } @@ -215,7 +215,6 @@ TEST (websocket, confirmation_options) ASSERT_FALSE (response); client_thread_finished = true; }); - client_thread.detach (); // Wait for subscribe acknowledgement system.deadline_set (5s); @@ -260,7 +259,6 @@ TEST (websocket, confirmation_options) client_thread_2_finished = true; }); - client_thread_2.detach (); // Wait for the subscribe action to be acknowledged system.deadline_set (5s); @@ -296,7 +294,6 @@ TEST (websocket, confirmation_options) ASSERT_FALSE (response); client_thread_3_finished = true; }); - client_thread_3.detach (); // Confirm a legacy block // When filtering options are enabled, legacy blocks are always filtered @@ -315,6 +312,9 @@ TEST (websocket, confirmation_options) } ack_ready = false; + client_thread.join (); + client_thread_2.join (); + client_thread_3.join (); node1->stop (); } @@ -353,7 +353,6 @@ TEST (websocket, vote) ASSERT_EQ (event.get ("topic"), "vote"); client_thread_finished = true; }); - client_thread.detach (); // Wait for the subscription to be acknowledged system.deadline_set (5s); @@ -379,6 +378,7 @@ TEST (websocket, vote) ASSERT_NO_ERROR (system.poll ()); } + client_thread.join (); node1->stop (); } @@ -418,7 +418,6 @@ TEST (websocket, vote_options) ASSERT_EQ (event.get ("topic"), "vote"); client_thread_finished = true; }); - client_thread.detach (); // Wait for the subscription to be acknowledged system.deadline_set (5s); @@ -459,7 +458,6 @@ TEST (websocket, vote_options) ASSERT_FALSE (response); client_thread_2_finished = true; }); - client_thread_2.detach (); // Wait for the subscription to be acknowledged system.deadline_set (5s); @@ -481,5 +479,7 @@ TEST (websocket, vote_options) ASSERT_NO_ERROR (system.poll ()); } + client_thread.join (); + client_thread_2.join (); node1->stop (); } diff --git a/nano/node/websocket.cpp b/nano/node/websocket.cpp index 2d6faca1a..4d8020753 100644 --- a/nano/node/websocket.cpp +++ b/nano/node/websocket.cpp @@ -113,37 +113,37 @@ nano::websocket::session::~session () ws_listener.decrease_subscription_count (subscription.first); } } - - ws_listener.get_node ().logger.try_log ("Websocket: session ended"); } void nano::websocket::session::handshake () { - std::lock_guard lk (io_mutex); - ws.async_accept ([self_l = shared_from_this ()](boost::system::error_code const & ec) { + auto this_l (shared_from_this ()); + ws.async_accept ([this_l](boost::system::error_code const & ec) { if (!ec) { // Start reading incoming messages - self_l->read (); + this_l->read (); } else { - self_l->ws_listener.get_node ().logger.always_log ("Websocket: handshake failed: ", ec.message ()); + this_l->ws_listener.get_node ().logger.always_log ("Websocket: handshake failed: ", ec.message ()); } }); } void nano::websocket::session::close () { + ws_listener.get_node ().logger.try_log ("Websocket: session closing"); + + auto this_l (shared_from_this ()); // clang-format off - std::lock_guard lk (io_mutex); - boost::asio::post (strand, - [self_l = shared_from_this ()]() { + boost::asio::dispatch (strand, + [this_l]() { boost::beast::websocket::close_reason reason; reason.code = boost::beast::websocket::close_code::normal; reason.reason = "Shutting down"; boost::system::error_code ec_ignore; - self_l->ws.close (reason, ec_ignore); + this_l->ws.close (reason, ec_ignore); }); // clang-format on } @@ -156,13 +156,14 @@ void nano::websocket::session::write (nano::websocket::message message_a) if (message_a.topic == nano::websocket::topic::ack || (subscription != subscriptions.end () && !subscription->second->should_filter (message_a))) { lk.unlock (); + auto this_l (shared_from_this ()); boost::asio::post (strand, - [message_a, self_l = shared_from_this ()]() { - bool write_in_progress = !self_l->send_queue.empty (); - self_l->send_queue.emplace_back (message_a); + [message_a, this_l]() { + bool write_in_progress = !this_l->send_queue.empty (); + this_l->send_queue.emplace_back (message_a); if (!write_in_progress) { - self_l->write_queued_messages (); + this_l->write_queued_messages (); } }); } @@ -171,20 +172,20 @@ void nano::websocket::session::write (nano::websocket::message message_a) void nano::websocket::session::write_queued_messages () { - // clang-format off auto msg (send_queue.front ()); auto msg_str (msg.to_string ()); + auto this_l (shared_from_this ()); - std::lock_guard lk (io_mutex); - ws.async_write (boost::asio::buffer (msg_str.data (), msg_str.size ()), + // clang-format off + ws.async_write (boost::asio::buffer (msg_str->data (), msg_str->size ()), boost::asio::bind_executor (strand, - [msg, self_l = shared_from_this ()](boost::system::error_code ec, std::size_t bytes_transferred) { - self_l->send_queue.pop_front (); + [msg_str, this_l](boost::system::error_code ec, std::size_t bytes_transferred) { + this_l->send_queue.pop_front (); if (!ec) { - if (!self_l->send_queue.empty ()) + if (!this_l->send_queue.empty ()) { - self_l->write_queued_messages (); + this_l->write_queued_messages (); } } })); @@ -193,37 +194,40 @@ void nano::websocket::session::write_queued_messages () void nano::websocket::session::read () { + auto this_l (shared_from_this ()); + // clang-format off - std::lock_guard lk (io_mutex); - ws.async_read (read_buffer, - boost::asio::bind_executor (strand, - [self_l = shared_from_this ()](boost::system::error_code ec, std::size_t bytes_transferred) { - if (!ec) - { - std::stringstream os; - os << beast_buffers (self_l->read_buffer.data ()); - std::string incoming_message = os.str (); - - // Prepare next read by clearing the multibuffer - self_l->read_buffer.consume (self_l->read_buffer.size ()); - - boost::property_tree::ptree tree_msg; - try + boost::asio::post (strand, [this_l]() { + this_l->ws.async_read (this_l->read_buffer, + boost::asio::bind_executor (this_l->strand, + [this_l](boost::system::error_code ec, std::size_t bytes_transferred) { + if (!ec) { - boost::property_tree::read_json (os, tree_msg); - self_l->handle_message (tree_msg); - self_l->read (); + std::stringstream os; + os << beast_buffers (this_l->read_buffer.data ()); + std::string incoming_message = os.str (); + + // Prepare next read by clearing the multibuffer + this_l->read_buffer.consume (this_l->read_buffer.size ()); + + boost::property_tree::ptree tree_msg; + try + { + boost::property_tree::read_json (os, tree_msg); + this_l->handle_message (tree_msg); + this_l->read (); + } + catch (boost::property_tree::json_parser::json_parser_error const & ex) + { + this_l->ws_listener.get_node ().logger.try_log ("Websocket: json parsing failed: ", ex.what ()); + } } - catch (boost::property_tree::json_parser::json_parser_error const & ex) + else { - self_l->ws_listener.get_node ().logger.try_log ("Websocket: json parsing failed: ", ex.what ()); + this_l->ws_listener.get_node ().logger.try_log ("Websocket: read failed: ", ec.message ()); } - } - else - { - self_l->ws_listener.get_node ().logger.try_log ("Websocket: read failed: ", ec.message ()); - } - })); + })); + }); // clang-format on } @@ -326,6 +330,7 @@ void nano::websocket::listener::stop () stopped = true; acceptor.close (); + std::lock_guard lk (sessions_mutex); for (auto & weak_session : sessions) { auto session_ptr (weak_session.lock ()); @@ -334,6 +339,7 @@ void nano::websocket::listener::stop () session_ptr->close (); } } + sessions.clear (); } nano::websocket::listener::listener (nano::node & node_a, boost::asio::ip::tcp::endpoint endpoint_a) : @@ -364,9 +370,10 @@ void nano::websocket::listener::run () void nano::websocket::listener::accept () { + auto this_l (shared_from_this ()); acceptor.async_accept (socket, - [self_l = shared_from_this ()](boost::system::error_code const & ec) { - self_l->on_accept (ec); + [this_l](boost::system::error_code const & ec) { + this_l->on_accept (ec); }); } @@ -382,6 +389,8 @@ void nano::websocket::listener::on_accept (boost::system::error_code ec) auto session (std::make_shared (*this, std::move (socket))); sessions_mutex.lock (); sessions.push_back (session); + // Clean up expired sessions + sessions.erase (std::remove_if (sessions.begin (), sessions.end (), [](auto & elem) { return elem.expired (); }), sessions.end ()); sessions_mutex.unlock (); session->handshake (); } @@ -403,9 +412,6 @@ void nano::websocket::listener::broadcast (nano::websocket::message message_a) session_ptr->write (message_a); } } - - // Clean up expired sessions - sessions.erase (std::remove_if (sessions.begin (), sessions.end (), [](auto & elem) { return elem.expired (); }), sessions.end ()); } bool nano::websocket::listener::any_subscribers (nano::websocket::topic const & topic_a) @@ -469,10 +475,10 @@ void nano::websocket::message_builder::set_common_fields (nano::websocket::messa message_a.contents.add ("time", std::to_string (milli_since_epoch)); } -std::string nano::websocket::message::to_string () const +std::shared_ptr nano::websocket::message::to_string () const { std::ostringstream ostream; boost::property_tree::write_json (ostream, contents); ostream.flush (); - return ostream.str (); + return std::make_shared (ostream.str ()); } diff --git a/nano/node/websocket.hpp b/nano/node/websocket.hpp index f92c48c8f..e352b148e 100644 --- a/nano/node/websocket.hpp +++ b/nano/node/websocket.hpp @@ -66,7 +66,7 @@ namespace websocket { } - std::string to_string () const; + std::shared_ptr to_string () const; nano::websocket::topic topic; boost::property_tree::ptree contents; }; @@ -180,8 +180,6 @@ namespace websocket boost::asio::strand strand; /** Outgoing messages. The send queue is protected by accessing it only through the strand */ std::deque send_queue; - /** Serialize calls to websocket::stream initiating functions */ - std::mutex io_mutex; /** Hash functor for topic enums */ struct topic_hash