WebSocket fixes and improvements (#2007)

This commit is contained in:
cryptocode 2019-05-21 21:43:49 +02:00 committed by GitHub
commit fb093ff189
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 72 additions and 68 deletions

View file

@ -55,12 +55,12 @@ boost::optional<std::string> websocket_test_call (std::string host, std::string
if (await_response)
{
assert (response_deadline > 0s);
boost::beast::flat_buffer buffer;
ws.async_read (buffer, [&ret, &buffer](boost::beast::error_code const & ec, std::size_t const n) {
auto buffer (std::make_shared<boost::beast::flat_buffer> ());
ws.async_read (*buffer, [&ret, buffer](boost::beast::error_code const & ec, std::size_t const n) {
if (!ec)
{
std::ostringstream res;
res << beast_buffers (buffer.data ());
res << beast_buffers (buffer->data ());
ret = res.str ();
}
});
@ -109,7 +109,6 @@ TEST (websocket, confirmation)
ASSERT_EQ (event.get<std::string> ("topic"), "confirmation");
confirmation_event_received = true;
});
client_thread.detach ();
// Wait for the subscription to be acknowledged
system.deadline_set (5s);
@ -140,6 +139,7 @@ TEST (websocket, confirmation)
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
client_thread.join ();
std::atomic<bool> unsubscribe_ack_received{ false };
std::thread client_thread_2 ([&unsubscribe_ack_received]() {
@ -157,7 +157,6 @@ TEST (websocket, confirmation)
R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json", true, true, 1s);
unsubscribe_ack_received = true;
});
client_thread_2.detach ();
// Wait for the subscription to be acknowledged
system.deadline_set (5s);
@ -182,6 +181,7 @@ TEST (websocket, confirmation)
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
client_thread_2.join ();
node1->stop ();
}
@ -215,7 +215,6 @@ TEST (websocket, confirmation_options)
ASSERT_FALSE (response);
client_thread_finished = true;
});
client_thread.detach ();
// Wait for subscribe acknowledgement
system.deadline_set (5s);
@ -260,7 +259,6 @@ TEST (websocket, confirmation_options)
client_thread_2_finished = true;
});
client_thread_2.detach ();
// Wait for the subscribe action to be acknowledged
system.deadline_set (5s);
@ -296,7 +294,6 @@ TEST (websocket, confirmation_options)
ASSERT_FALSE (response);
client_thread_3_finished = true;
});
client_thread_3.detach ();
// Confirm a legacy block
// When filtering options are enabled, legacy blocks are always filtered
@ -315,6 +312,9 @@ TEST (websocket, confirmation_options)
}
ack_ready = false;
client_thread.join ();
client_thread_2.join ();
client_thread_3.join ();
node1->stop ();
}
@ -353,7 +353,6 @@ TEST (websocket, vote)
ASSERT_EQ (event.get<std::string> ("topic"), "vote");
client_thread_finished = true;
});
client_thread.detach ();
// Wait for the subscription to be acknowledged
system.deadline_set (5s);
@ -379,6 +378,7 @@ TEST (websocket, vote)
ASSERT_NO_ERROR (system.poll ());
}
client_thread.join ();
node1->stop ();
}
@ -418,7 +418,6 @@ TEST (websocket, vote_options)
ASSERT_EQ (event.get<std::string> ("topic"), "vote");
client_thread_finished = true;
});
client_thread.detach ();
// Wait for the subscription to be acknowledged
system.deadline_set (5s);
@ -459,7 +458,6 @@ TEST (websocket, vote_options)
ASSERT_FALSE (response);
client_thread_2_finished = true;
});
client_thread_2.detach ();
// Wait for the subscription to be acknowledged
system.deadline_set (5s);
@ -481,5 +479,7 @@ TEST (websocket, vote_options)
ASSERT_NO_ERROR (system.poll ());
}
client_thread.join ();
client_thread_2.join ();
node1->stop ();
}

View file

@ -113,37 +113,37 @@ nano::websocket::session::~session ()
ws_listener.decrease_subscription_count (subscription.first);
}
}
ws_listener.get_node ().logger.try_log ("Websocket: session ended");
}
void nano::websocket::session::handshake ()
{
std::lock_guard<std::mutex> lk (io_mutex);
ws.async_accept ([self_l = shared_from_this ()](boost::system::error_code const & ec) {
auto this_l (shared_from_this ());
ws.async_accept ([this_l](boost::system::error_code const & ec) {
if (!ec)
{
// Start reading incoming messages
self_l->read ();
this_l->read ();
}
else
{
self_l->ws_listener.get_node ().logger.always_log ("Websocket: handshake failed: ", ec.message ());
this_l->ws_listener.get_node ().logger.always_log ("Websocket: handshake failed: ", ec.message ());
}
});
}
void nano::websocket::session::close ()
{
ws_listener.get_node ().logger.try_log ("Websocket: session closing");
auto this_l (shared_from_this ());
// clang-format off
std::lock_guard<std::mutex> lk (io_mutex);
boost::asio::post (strand,
[self_l = shared_from_this ()]() {
boost::asio::dispatch (strand,
[this_l]() {
boost::beast::websocket::close_reason reason;
reason.code = boost::beast::websocket::close_code::normal;
reason.reason = "Shutting down";
boost::system::error_code ec_ignore;
self_l->ws.close (reason, ec_ignore);
this_l->ws.close (reason, ec_ignore);
});
// clang-format on
}
@ -156,13 +156,14 @@ void nano::websocket::session::write (nano::websocket::message message_a)
if (message_a.topic == nano::websocket::topic::ack || (subscription != subscriptions.end () && !subscription->second->should_filter (message_a)))
{
lk.unlock ();
auto this_l (shared_from_this ());
boost::asio::post (strand,
[message_a, self_l = shared_from_this ()]() {
bool write_in_progress = !self_l->send_queue.empty ();
self_l->send_queue.emplace_back (message_a);
[message_a, this_l]() {
bool write_in_progress = !this_l->send_queue.empty ();
this_l->send_queue.emplace_back (message_a);
if (!write_in_progress)
{
self_l->write_queued_messages ();
this_l->write_queued_messages ();
}
});
}
@ -171,20 +172,20 @@ void nano::websocket::session::write (nano::websocket::message message_a)
void nano::websocket::session::write_queued_messages ()
{
// clang-format off
auto msg (send_queue.front ());
auto msg_str (msg.to_string ());
auto this_l (shared_from_this ());
std::lock_guard<std::mutex> lk (io_mutex);
ws.async_write (boost::asio::buffer (msg_str.data (), msg_str.size ()),
// clang-format off
ws.async_write (boost::asio::buffer (msg_str->data (), msg_str->size ()),
boost::asio::bind_executor (strand,
[msg, self_l = shared_from_this ()](boost::system::error_code ec, std::size_t bytes_transferred) {
self_l->send_queue.pop_front ();
[msg_str, this_l](boost::system::error_code ec, std::size_t bytes_transferred) {
this_l->send_queue.pop_front ();
if (!ec)
{
if (!self_l->send_queue.empty ())
if (!this_l->send_queue.empty ())
{
self_l->write_queued_messages ();
this_l->write_queued_messages ();
}
}
}));
@ -193,37 +194,40 @@ void nano::websocket::session::write_queued_messages ()
void nano::websocket::session::read ()
{
auto this_l (shared_from_this ());
// clang-format off
std::lock_guard<std::mutex> lk (io_mutex);
ws.async_read (read_buffer,
boost::asio::bind_executor (strand,
[self_l = shared_from_this ()](boost::system::error_code ec, std::size_t bytes_transferred) {
if (!ec)
{
std::stringstream os;
os << beast_buffers (self_l->read_buffer.data ());
std::string incoming_message = os.str ();
// Prepare next read by clearing the multibuffer
self_l->read_buffer.consume (self_l->read_buffer.size ());
boost::property_tree::ptree tree_msg;
try
boost::asio::post (strand, [this_l]() {
this_l->ws.async_read (this_l->read_buffer,
boost::asio::bind_executor (this_l->strand,
[this_l](boost::system::error_code ec, std::size_t bytes_transferred) {
if (!ec)
{
boost::property_tree::read_json (os, tree_msg);
self_l->handle_message (tree_msg);
self_l->read ();
std::stringstream os;
os << beast_buffers (this_l->read_buffer.data ());
std::string incoming_message = os.str ();
// Prepare next read by clearing the multibuffer
this_l->read_buffer.consume (this_l->read_buffer.size ());
boost::property_tree::ptree tree_msg;
try
{
boost::property_tree::read_json (os, tree_msg);
this_l->handle_message (tree_msg);
this_l->read ();
}
catch (boost::property_tree::json_parser::json_parser_error const & ex)
{
this_l->ws_listener.get_node ().logger.try_log ("Websocket: json parsing failed: ", ex.what ());
}
}
catch (boost::property_tree::json_parser::json_parser_error const & ex)
else
{
self_l->ws_listener.get_node ().logger.try_log ("Websocket: json parsing failed: ", ex.what ());
this_l->ws_listener.get_node ().logger.try_log ("Websocket: read failed: ", ec.message ());
}
}
else
{
self_l->ws_listener.get_node ().logger.try_log ("Websocket: read failed: ", ec.message ());
}
}));
}));
});
// clang-format on
}
@ -326,6 +330,7 @@ void nano::websocket::listener::stop ()
stopped = true;
acceptor.close ();
std::lock_guard<std::mutex> lk (sessions_mutex);
for (auto & weak_session : sessions)
{
auto session_ptr (weak_session.lock ());
@ -334,6 +339,7 @@ void nano::websocket::listener::stop ()
session_ptr->close ();
}
}
sessions.clear ();
}
nano::websocket::listener::listener (nano::node & node_a, boost::asio::ip::tcp::endpoint endpoint_a) :
@ -364,9 +370,10 @@ void nano::websocket::listener::run ()
void nano::websocket::listener::accept ()
{
auto this_l (shared_from_this ());
acceptor.async_accept (socket,
[self_l = shared_from_this ()](boost::system::error_code const & ec) {
self_l->on_accept (ec);
[this_l](boost::system::error_code const & ec) {
this_l->on_accept (ec);
});
}
@ -382,6 +389,8 @@ void nano::websocket::listener::on_accept (boost::system::error_code ec)
auto session (std::make_shared<nano::websocket::session> (*this, std::move (socket)));
sessions_mutex.lock ();
sessions.push_back (session);
// Clean up expired sessions
sessions.erase (std::remove_if (sessions.begin (), sessions.end (), [](auto & elem) { return elem.expired (); }), sessions.end ());
sessions_mutex.unlock ();
session->handshake ();
}
@ -403,9 +412,6 @@ void nano::websocket::listener::broadcast (nano::websocket::message message_a)
session_ptr->write (message_a);
}
}
// Clean up expired sessions
sessions.erase (std::remove_if (sessions.begin (), sessions.end (), [](auto & elem) { return elem.expired (); }), sessions.end ());
}
bool nano::websocket::listener::any_subscribers (nano::websocket::topic const & topic_a)
@ -469,10 +475,10 @@ void nano::websocket::message_builder::set_common_fields (nano::websocket::messa
message_a.contents.add ("time", std::to_string (milli_since_epoch));
}
std::string nano::websocket::message::to_string () const
std::shared_ptr<std::string> nano::websocket::message::to_string () const
{
std::ostringstream ostream;
boost::property_tree::write_json (ostream, contents);
ostream.flush ();
return ostream.str ();
return std::make_shared<std::string> (ostream.str ());
}

View file

@ -66,7 +66,7 @@ namespace websocket
{
}
std::string to_string () const;
std::shared_ptr<std::string> to_string () const;
nano::websocket::topic topic;
boost::property_tree::ptree contents;
};
@ -180,8 +180,6 @@ namespace websocket
boost::asio::strand<boost::asio::io_context::executor_type> strand;
/** Outgoing messages. The send queue is protected by accessing it only through the strand */
std::deque<message> send_queue;
/** Serialize calls to websocket::stream initiating functions */
std::mutex io_mutex;
/** Hash functor for topic enums */
struct topic_hash