Websockets: prevent re-sub (#2001)

* Differentiate between new subscription and an update, increasing subscription count only in the first case

* EOF is clean closed connection, no need to log as error

* Rename some functions, make increase/decrease subscriber count methods private (session is now a friend)

* Fix and add a getter for subscriber count per topic

* Fix subscriber counts

* Fix tests and add a new one to test re-sub or un-sub when no sub exists

* Add temporary fix for crashing when running in debug

* Remove sleep, works with detaching but hangs without

* Use async_close to avoid getting stuck on close() without system polling

* Simplify logging calls

* Comment
This commit is contained in:
Guilherme Lawless 2019-05-25 15:35:02 +01:00 committed by cryptocode
commit 80370ae4fd
4 changed files with 143 additions and 40 deletions

View file

@ -69,15 +69,98 @@ boost::optional<std::string> websocket_test_call (std::string host, std::string
});
ioc.run_one_for (response_deadline);
}
if (ws.is_open ())
{
boost::beast::error_code ec_ignored;
ws.close (boost::beast::websocket::close_code::normal, ec_ignored);
ws.async_close (boost::beast::websocket::close_code::normal, [](boost::beast::error_code const & ec) {
// A synchronous close usually hangs in tests when the server's io_context stops looping
// An async_close solves this problem
});
}
return ret;
}
}
/** Tests clients subscribing multiple times or unsubscribing without a subscription */
TEST (websocket, subscription_edge)
{
nano::system system (24000, 1);
nano::node_init init1;
nano::node_config config;
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = 24078;
auto node1 (std::make_shared<nano::node> (init1, system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
node1->start ();
system.nodes.push_back (node1);
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
// First subscription
{
ack_ready = false;
std::thread subscription_thread ([]() {
websocket_test_call ("::1", "24078", R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true, false);
});
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
subscription_thread.join ();
ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
}
// Second subscription, should not increase subscriber count, only update the subscription
{
ack_ready = false;
std::thread subscription_thread ([]() {
websocket_test_call ("::1", "24078", R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true, false);
});
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
subscription_thread.join ();
ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
}
// First unsub
{
ack_ready = false;
std::thread unsub_thread ([]() {
websocket_test_call ("::1", "24078", R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json", true, false);
});
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
unsub_thread.join ();
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
}
// Second unsub, should acknowledge but not decrease subscriber count
{
ack_ready = false;
std::thread unsub_thread ([]() {
websocket_test_call ("::1", "24078", R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json", true, false);
});
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
unsub_thread.join ();
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
}
node1->stop ();
}
/** Subscribes to block confirmations, confirms a block and then awaits websocket notification */
TEST (websocket, confirmation)
{
@ -98,7 +181,7 @@ TEST (websocket, confirmation)
// Start websocket test-client in a separate thread
ack_ready = false;
std::atomic<bool> confirmation_event_received{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation));
ASSERT_FALSE (node1->websocket_server->any_subscriber (nano::websocket::topic::confirmation));
std::thread client_thread ([&confirmation_event_received]() {
// This will expect two results: the acknowledgement of the subscription
// and then the block confirmation message
@ -121,7 +204,7 @@ TEST (websocket, confirmation)
}
ack_ready = false;
ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation));
ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::confirmation));
nano::keypair key;
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
@ -209,7 +292,7 @@ TEST (websocket, confirmation_options)
// Start websocket test-client in a separate thread
ack_ready = false;
std::atomic<bool> client_thread_finished{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation));
ASSERT_FALSE (node1->websocket_server->any_subscriber (nano::websocket::topic::confirmation));
std::thread client_thread ([&client_thread_finished]() {
// Subscribe initially with a specific invalid account
auto response = websocket_test_call ("::1", "24078",
@ -271,7 +354,7 @@ TEST (websocket, confirmation_options)
}
ack_ready = false;
ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation));
ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::confirmation));
// Quick-confirm another block
{
@ -341,7 +424,7 @@ TEST (websocket, vote)
// Start websocket test-client in a separate thread
ack_ready = false;
std::atomic<bool> client_thread_finished{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));
ASSERT_FALSE (node1->websocket_server->any_subscriber (nano::websocket::topic::vote));
std::thread client_thread ([&client_thread_finished]() {
// This will expect two results: the acknowledgement of the subscription
// and then the vote message
@ -365,7 +448,7 @@ TEST (websocket, vote)
}
ack_ready = false;
ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));
ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::vote));
// Quick-confirm a block
nano::keypair key;
@ -405,7 +488,7 @@ TEST (websocket, vote_options)
// Start websocket test-client in a separate thread
ack_ready = false;
std::atomic<bool> client_thread_finished{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));
ASSERT_FALSE (node1->websocket_server->any_subscriber (nano::websocket::topic::vote));
std::thread client_thread ([&client_thread_finished]() {
std::ostringstream data;
data << R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"representatives": [")json"
@ -430,7 +513,7 @@ TEST (websocket, vote_options)
}
ack_ready = false;
ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));
ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::vote));
// Quick-confirm a block
nano::keypair key;
@ -447,7 +530,7 @@ TEST (websocket, vote_options)
// Wait for the websocket client to receive the vote message
system.deadline_set (5s);
while (!client_thread_finished || node1->websocket_server->any_subscribers (nano::websocket::topic::vote))
while (!client_thread_finished || node1->websocket_server->any_subscriber (nano::websocket::topic::vote))
{
ASSERT_NO_ERROR (system.poll ());
}
@ -470,7 +553,7 @@ TEST (websocket, vote_options)
}
ack_ready = false;
ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));
ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::vote));
// Confirm another block
confirm_block ();

View file

@ -631,7 +631,7 @@ startup_time (std::chrono::steady_clock::now ())
if (websocket_server)
{
observers.blocks.add ([this](std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a) {
if (this->websocket_server->any_subscribers (nano::websocket::topic::confirmation))
if (this->websocket_server->any_subscriber (nano::websocket::topic::confirmation))
{
if (this->block_arrival.recent (block_a->hash ()))
{
@ -715,7 +715,7 @@ startup_time (std::chrono::steady_clock::now ())
if (this->websocket_server)
{
observers.vote.add ([this](nano::transaction const & transaction, std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> channel_a) {
if (this->websocket_server->any_subscribers (nano::websocket::topic::vote))
if (this->websocket_server->any_subscriber (nano::websocket::topic::vote))
{
nano::websocket::message_builder builder;
auto msg (builder.vote_received (vote_a));

View file

@ -23,7 +23,7 @@ all_local_accounts (options_a.get<bool> ("all_local_accounts", false))
}
else
{
node.logger.always_log (boost::str (boost::format ("Websocket: invalid account provided for filtering blocks: %1%") % account_l.second.data ()));
node.logger.always_log ("Websocket: invalid account provided for filtering blocks: ", account_l.second.data ());
}
}
}
@ -77,7 +77,7 @@ node (node_a)
}
else
{
node.logger.always_log (boost::str (boost::format ("Websocket: invalid account given to filter votes: %1%") % representative_l.second.data ()));
node.logger.always_log ("Websocket: invalid account given to filter votes: ", representative_l.second.data ());
}
}
}
@ -112,7 +112,7 @@ nano::websocket::session::~session ()
std::unique_lock<std::mutex> lk (subscriptions_mutex);
for (auto & subscription : subscriptions)
{
ws_listener.decrease_subscription_count (subscription.first);
ws_listener.decrease_subscriber_count (subscription.first);
}
}
}
@ -224,7 +224,7 @@ void nano::websocket::session::read ()
this_l->ws_listener.get_node ().logger.try_log ("Websocket: json parsing failed: ", ex.what ());
}
}
else
else if (ec != boost::asio::error::eof)
{
this_l->ws_listener.get_node ().logger.try_log ("Websocket: read failed: ", ec.message ());
}
@ -295,21 +295,33 @@ void nano::websocket::session::handle_message (boost::property_tree::ptree const
auto action_succeeded (false);
if (action == "subscribe" && topic_l != nano::websocket::topic::invalid)
{
auto options_l (message_a.get_child_optional ("options"));
auto options_text_l (message_a.get_child_optional ("options"));
std::lock_guard<std::mutex> lk (subscriptions_mutex);
if (topic_l == nano::websocket::topic::confirmation)
std::unique_ptr<nano::websocket::options> options_l{ nullptr };
if (options_text_l && topic_l == nano::websocket::topic::confirmation)
{
subscriptions.insert (std::make_pair (topic_l, options_l ? std::make_unique<nano::websocket::confirmation_options> (options_l.get (), ws_listener.get_node ()) : std::make_unique<nano::websocket::options> ()));
options_l = std::make_unique<nano::websocket::confirmation_options> (options_text_l.get (), ws_listener.get_node ());
}
else if (topic_l == nano::websocket::topic::vote)
else if (options_text_l && topic_l == nano::websocket::topic::vote)
{
subscriptions.insert (std::make_pair (topic_l, options_l ? std::make_unique<nano::websocket::vote_options> (options_l.get (), ws_listener.get_node ()) : std::make_unique<nano::websocket::options> ()));
options_l = std::make_unique<nano::websocket::vote_options> (options_text_l.get (), ws_listener.get_node ());
}
else
{
subscriptions.insert (std::make_pair (topic_l, std::make_unique<nano::websocket::options> ()));
options_l = std::make_unique<nano::websocket::options> ();
}
auto existing (subscriptions.find (topic_l));
if (existing != subscriptions.end ())
{
existing->second = std::move (options_l);
ws_listener.get_node ().logger.always_log ("Websocket: updated subscription to topic: ", from_topic (topic_l));
}
else
{
subscriptions.insert (std::make_pair (topic_l, std::move (options_l)));
ws_listener.get_node ().logger.always_log ("Websocket: new subscription to topic: ", from_topic (topic_l));
ws_listener.increase_subscriber_count (topic_l);
}
ws_listener.increase_subscription_count (topic_l);
action_succeeded = true;
}
else if (action == "unsubscribe" && topic_l != nano::websocket::topic::invalid)
@ -317,7 +329,8 @@ void nano::websocket::session::handle_message (boost::property_tree::ptree const
std::lock_guard<std::mutex> lk (subscriptions_mutex);
if (subscriptions.erase (topic_l))
{
ws_listener.decrease_subscription_count (topic_l);
ws_listener.get_node ().logger.always_log ("Websocket: removed subscription to topic: ", from_topic (topic_l));
ws_listener.decrease_subscriber_count (topic_l);
}
action_succeeded = true;
}
@ -416,19 +429,14 @@ void nano::websocket::listener::broadcast (nano::websocket::message message_a)
}
}
bool nano::websocket::listener::any_subscribers (nano::websocket::topic const & topic_a)
void nano::websocket::listener::increase_subscriber_count (nano::websocket::topic const & topic_a)
{
return topic_subscription_count[static_cast<std::size_t> (topic_a)] > 0;
topic_subscriber_count[static_cast<std::size_t> (topic_a)] += 1;
}
void nano::websocket::listener::increase_subscription_count (nano::websocket::topic const & topic_a)
void nano::websocket::listener::decrease_subscriber_count (nano::websocket::topic const & topic_a)
{
topic_subscription_count[static_cast<std::size_t> (topic_a)] += 1;
}
void nano::websocket::listener::decrease_subscription_count (nano::websocket::topic const & topic_a)
{
auto & count (topic_subscription_count[static_cast<std::size_t> (topic_a)]);
auto & count (topic_subscriber_count[static_cast<std::size_t> (topic_a)]);
release_assert (count > 0);
count -= 1;
}

View file

@ -230,19 +230,31 @@ namespace websocket
* Per-topic subscribers check. Relies on all sessions correctly increasing and
* decreasing the subscriber counts themselves.
*/
bool any_subscribers (nano::websocket::topic const & topic_a);
/** Adds to subscription count of a specific topic*/
void increase_subscription_count (nano::websocket::topic const & topic_a);
/** Removes from subscription count of a specific topic*/
void decrease_subscription_count (nano::websocket::topic const & topic_a);
bool any_subscriber (nano::websocket::topic const & topic_a) const
{
return subscriber_count (topic_a) > 0;
}
/** Getter for subscriber count of a specific topic*/
size_t subscriber_count (nano::websocket::topic const & topic_a) const
{
return topic_subscriber_count[static_cast<std::size_t> (topic_a)];
}
private:
/** A websocket session can increase and decrease subscription counts. */
friend nano::websocket::session;
/** Adds to subscription count of a specific topic*/
void increase_subscriber_count (nano::websocket::topic const & topic_a);
/** Removes from subscription count of a specific topic*/
void decrease_subscriber_count (nano::websocket::topic const & topic_a);
nano::node & node;
boost::asio::ip::tcp::acceptor acceptor;
socket_type socket;
std::mutex sessions_mutex;
std::vector<std::weak_ptr<session>> sessions;
std::array<std::atomic<std::size_t>, number_topics> topic_subscription_count{};
std::array<std::atomic<std::size_t>, number_topics> topic_subscriber_count{};
std::atomic<bool> stopped{ false };
};
}