From 80370ae4fd7ac439d4083347a194d50c3673d845 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Sat, 25 May 2019 15:35:02 +0100 Subject: [PATCH] Websockets: prevent re-sub (#2001) * Differentiate between new subscription and an update, increasing subscription count only in the first case * EOF is clean closed connection, no need to log as error * Rename some functions, make increase/decrease subscriber count methods private (session is now a friend) * Fix and add a getter for subscriber count per topic * Fix subscriber counts * Fix tests and add a new one to test re-sub or un-sub when no sub exists * Add temporary fix for crashing when running in debug * Remove sleep, works with detaching but hangs without * Use async_close to avoid getting stuck on close() without system polling * Simplify logging calls * Comment --- nano/core_test/websocket.cpp | 105 +++++++++++++++++++++++++++++++---- nano/node/node.cpp | 4 +- nano/node/websocket.cpp | 50 ++++++++++------- nano/node/websocket.hpp | 24 ++++++-- 4 files changed, 143 insertions(+), 40 deletions(-) diff --git a/nano/core_test/websocket.cpp b/nano/core_test/websocket.cpp index f24524ab..54b61839 100644 --- a/nano/core_test/websocket.cpp +++ b/nano/core_test/websocket.cpp @@ -69,15 +69,98 @@ boost::optional websocket_test_call (std::string host, std::string }); ioc.run_one_for (response_deadline); } + if (ws.is_open ()) { boost::beast::error_code ec_ignored; - ws.close (boost::beast::websocket::close_code::normal, ec_ignored); + ws.async_close (boost::beast::websocket::close_code::normal, [](boost::beast::error_code const & ec) { + // A synchronous close usually hangs in tests when the server's io_context stops looping + // An async_close solves this problem + }); } return ret; } } +/** Tests clients subscribing multiple times or unsubscribing without a subscription */ +TEST (websocket, subscription_edge) +{ + nano::system system (24000, 1); + nano::node_init init1; + nano::node_config config; + nano::node_flags node_flags; + config.websocket_config.enabled = true; + config.websocket_config.port = 24078; + + auto node1 (std::make_shared (init1, system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags)); + node1->start (); + system.nodes.push_back (node1); + + ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + + // First subscription + { + ack_ready = false; + std::thread subscription_thread ([]() { + websocket_test_call ("::1", "24078", R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true, false); + }); + system.deadline_set (5s); + while (!ack_ready) + { + ASSERT_NO_ERROR (system.poll ()); + } + subscription_thread.join (); + ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + } + + // Second subscription, should not increase subscriber count, only update the subscription + { + ack_ready = false; + std::thread subscription_thread ([]() { + websocket_test_call ("::1", "24078", R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true, false); + }); + system.deadline_set (5s); + while (!ack_ready) + { + ASSERT_NO_ERROR (system.poll ()); + } + subscription_thread.join (); + ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + } + + // First unsub + { + ack_ready = false; + std::thread unsub_thread ([]() { + websocket_test_call ("::1", "24078", R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json", true, false); + }); + system.deadline_set (5s); + while (!ack_ready) + { + ASSERT_NO_ERROR (system.poll ()); + } + unsub_thread.join (); + ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + } + + // Second unsub, should acknowledge but not decrease subscriber count + { + ack_ready = false; + std::thread unsub_thread ([]() { + websocket_test_call ("::1", "24078", R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json", true, false); + }); + system.deadline_set (5s); + while (!ack_ready) + { + ASSERT_NO_ERROR (system.poll ()); + } + unsub_thread.join (); + ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + } + + node1->stop (); +} + /** Subscribes to block confirmations, confirms a block and then awaits websocket notification */ TEST (websocket, confirmation) { @@ -98,7 +181,7 @@ TEST (websocket, confirmation) // Start websocket test-client in a separate thread ack_ready = false; std::atomic confirmation_event_received{ false }; - ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation)); + ASSERT_FALSE (node1->websocket_server->any_subscriber (nano::websocket::topic::confirmation)); std::thread client_thread ([&confirmation_event_received]() { // This will expect two results: the acknowledgement of the subscription // and then the block confirmation message @@ -121,7 +204,7 @@ TEST (websocket, confirmation) } ack_ready = false; - ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation)); + ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::confirmation)); nano::keypair key; system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv); @@ -209,7 +292,7 @@ TEST (websocket, confirmation_options) // Start websocket test-client in a separate thread ack_ready = false; std::atomic client_thread_finished{ false }; - ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation)); + ASSERT_FALSE (node1->websocket_server->any_subscriber (nano::websocket::topic::confirmation)); std::thread client_thread ([&client_thread_finished]() { // Subscribe initially with a specific invalid account auto response = websocket_test_call ("::1", "24078", @@ -271,7 +354,7 @@ TEST (websocket, confirmation_options) } ack_ready = false; - ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation)); + ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::confirmation)); // Quick-confirm another block { @@ -341,7 +424,7 @@ TEST (websocket, vote) // Start websocket test-client in a separate thread ack_ready = false; std::atomic client_thread_finished{ false }; - ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote)); + ASSERT_FALSE (node1->websocket_server->any_subscriber (nano::websocket::topic::vote)); std::thread client_thread ([&client_thread_finished]() { // This will expect two results: the acknowledgement of the subscription // and then the vote message @@ -365,7 +448,7 @@ TEST (websocket, vote) } ack_ready = false; - ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote)); + ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::vote)); // Quick-confirm a block nano::keypair key; @@ -405,7 +488,7 @@ TEST (websocket, vote_options) // Start websocket test-client in a separate thread ack_ready = false; std::atomic client_thread_finished{ false }; - ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote)); + ASSERT_FALSE (node1->websocket_server->any_subscriber (nano::websocket::topic::vote)); std::thread client_thread ([&client_thread_finished]() { std::ostringstream data; data << R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"representatives": [")json" @@ -430,7 +513,7 @@ TEST (websocket, vote_options) } ack_ready = false; - ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote)); + ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::vote)); // Quick-confirm a block nano::keypair key; @@ -447,7 +530,7 @@ TEST (websocket, vote_options) // Wait for the websocket client to receive the vote message system.deadline_set (5s); - while (!client_thread_finished || node1->websocket_server->any_subscribers (nano::websocket::topic::vote)) + while (!client_thread_finished || node1->websocket_server->any_subscriber (nano::websocket::topic::vote)) { ASSERT_NO_ERROR (system.poll ()); } @@ -470,7 +553,7 @@ TEST (websocket, vote_options) } ack_ready = false; - ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote)); + ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::vote)); // Confirm another block confirm_block (); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 43d015e5..87b5f2f5 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -631,7 +631,7 @@ startup_time (std::chrono::steady_clock::now ()) if (websocket_server) { observers.blocks.add ([this](std::shared_ptr block_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a) { - if (this->websocket_server->any_subscribers (nano::websocket::topic::confirmation)) + if (this->websocket_server->any_subscriber (nano::websocket::topic::confirmation)) { if (this->block_arrival.recent (block_a->hash ())) { @@ -715,7 +715,7 @@ startup_time (std::chrono::steady_clock::now ()) if (this->websocket_server) { observers.vote.add ([this](nano::transaction const & transaction, std::shared_ptr vote_a, std::shared_ptr channel_a) { - if (this->websocket_server->any_subscribers (nano::websocket::topic::vote)) + if (this->websocket_server->any_subscriber (nano::websocket::topic::vote)) { nano::websocket::message_builder builder; auto msg (builder.vote_received (vote_a)); diff --git a/nano/node/websocket.cpp b/nano/node/websocket.cpp index e32a2c4e..a65daf8c 100644 --- a/nano/node/websocket.cpp +++ b/nano/node/websocket.cpp @@ -23,7 +23,7 @@ all_local_accounts (options_a.get ("all_local_accounts", false)) } else { - node.logger.always_log (boost::str (boost::format ("Websocket: invalid account provided for filtering blocks: %1%") % account_l.second.data ())); + node.logger.always_log ("Websocket: invalid account provided for filtering blocks: ", account_l.second.data ()); } } } @@ -77,7 +77,7 @@ node (node_a) } else { - node.logger.always_log (boost::str (boost::format ("Websocket: invalid account given to filter votes: %1%") % representative_l.second.data ())); + node.logger.always_log ("Websocket: invalid account given to filter votes: ", representative_l.second.data ()); } } } @@ -112,7 +112,7 @@ nano::websocket::session::~session () std::unique_lock lk (subscriptions_mutex); for (auto & subscription : subscriptions) { - ws_listener.decrease_subscription_count (subscription.first); + ws_listener.decrease_subscriber_count (subscription.first); } } } @@ -224,7 +224,7 @@ void nano::websocket::session::read () this_l->ws_listener.get_node ().logger.try_log ("Websocket: json parsing failed: ", ex.what ()); } } - else + else if (ec != boost::asio::error::eof) { this_l->ws_listener.get_node ().logger.try_log ("Websocket: read failed: ", ec.message ()); } @@ -295,21 +295,33 @@ void nano::websocket::session::handle_message (boost::property_tree::ptree const auto action_succeeded (false); if (action == "subscribe" && topic_l != nano::websocket::topic::invalid) { - auto options_l (message_a.get_child_optional ("options")); + auto options_text_l (message_a.get_child_optional ("options")); std::lock_guard lk (subscriptions_mutex); - if (topic_l == nano::websocket::topic::confirmation) + std::unique_ptr options_l{ nullptr }; + if (options_text_l && topic_l == nano::websocket::topic::confirmation) { - subscriptions.insert (std::make_pair (topic_l, options_l ? std::make_unique (options_l.get (), ws_listener.get_node ()) : std::make_unique ())); + options_l = std::make_unique (options_text_l.get (), ws_listener.get_node ()); } - else if (topic_l == nano::websocket::topic::vote) + else if (options_text_l && topic_l == nano::websocket::topic::vote) { - subscriptions.insert (std::make_pair (topic_l, options_l ? std::make_unique (options_l.get (), ws_listener.get_node ()) : std::make_unique ())); + options_l = std::make_unique (options_text_l.get (), ws_listener.get_node ()); } else { - subscriptions.insert (std::make_pair (topic_l, std::make_unique ())); + options_l = std::make_unique (); + } + auto existing (subscriptions.find (topic_l)); + if (existing != subscriptions.end ()) + { + existing->second = std::move (options_l); + ws_listener.get_node ().logger.always_log ("Websocket: updated subscription to topic: ", from_topic (topic_l)); + } + else + { + subscriptions.insert (std::make_pair (topic_l, std::move (options_l))); + ws_listener.get_node ().logger.always_log ("Websocket: new subscription to topic: ", from_topic (topic_l)); + ws_listener.increase_subscriber_count (topic_l); } - ws_listener.increase_subscription_count (topic_l); action_succeeded = true; } else if (action == "unsubscribe" && topic_l != nano::websocket::topic::invalid) @@ -317,7 +329,8 @@ void nano::websocket::session::handle_message (boost::property_tree::ptree const std::lock_guard lk (subscriptions_mutex); if (subscriptions.erase (topic_l)) { - ws_listener.decrease_subscription_count (topic_l); + ws_listener.get_node ().logger.always_log ("Websocket: removed subscription to topic: ", from_topic (topic_l)); + ws_listener.decrease_subscriber_count (topic_l); } action_succeeded = true; } @@ -416,19 +429,14 @@ void nano::websocket::listener::broadcast (nano::websocket::message message_a) } } -bool nano::websocket::listener::any_subscribers (nano::websocket::topic const & topic_a) +void nano::websocket::listener::increase_subscriber_count (nano::websocket::topic const & topic_a) { - return topic_subscription_count[static_cast (topic_a)] > 0; + topic_subscriber_count[static_cast (topic_a)] += 1; } -void nano::websocket::listener::increase_subscription_count (nano::websocket::topic const & topic_a) +void nano::websocket::listener::decrease_subscriber_count (nano::websocket::topic const & topic_a) { - topic_subscription_count[static_cast (topic_a)] += 1; -} - -void nano::websocket::listener::decrease_subscription_count (nano::websocket::topic const & topic_a) -{ - auto & count (topic_subscription_count[static_cast (topic_a)]); + auto & count (topic_subscriber_count[static_cast (topic_a)]); release_assert (count > 0); count -= 1; } diff --git a/nano/node/websocket.hpp b/nano/node/websocket.hpp index c1006d62..acf74d3e 100644 --- a/nano/node/websocket.hpp +++ b/nano/node/websocket.hpp @@ -230,19 +230,31 @@ namespace websocket * Per-topic subscribers check. Relies on all sessions correctly increasing and * decreasing the subscriber counts themselves. */ - bool any_subscribers (nano::websocket::topic const & topic_a); - /** Adds to subscription count of a specific topic*/ - void increase_subscription_count (nano::websocket::topic const & topic_a); - /** Removes from subscription count of a specific topic*/ - void decrease_subscription_count (nano::websocket::topic const & topic_a); + bool any_subscriber (nano::websocket::topic const & topic_a) const + { + return subscriber_count (topic_a) > 0; + } + /** Getter for subscriber count of a specific topic*/ + size_t subscriber_count (nano::websocket::topic const & topic_a) const + { + return topic_subscriber_count[static_cast (topic_a)]; + } private: + /** A websocket session can increase and decrease subscription counts. */ + friend nano::websocket::session; + + /** Adds to subscription count of a specific topic*/ + void increase_subscriber_count (nano::websocket::topic const & topic_a); + /** Removes from subscription count of a specific topic*/ + void decrease_subscriber_count (nano::websocket::topic const & topic_a); + nano::node & node; boost::asio::ip::tcp::acceptor acceptor; socket_type socket; std::mutex sessions_mutex; std::vector> sessions; - std::array, number_topics> topic_subscription_count{}; + std::array, number_topics> topic_subscriber_count{}; std::atomic stopped{ false }; }; }