From 9a9fbcaa700b6a8b6d84b786d2e6077cd3a01adc Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Thu, 18 Apr 2019 11:55:24 +0100 Subject: [PATCH] Websockets - filtering options (#1907) * Add subscription filters for accounts in wallets or a custom list of accounts * Small syntax * Another small leftover * Change to a set of strings, add an assert * Inverse logic for decode_account assert * Remove virtual semantics from the final classes, add documentation * Remove nano::node passing through functions, add as a member in options that need it * Check destination account as well * Change to should_filter * Add test for confirmation options * Check for subscribers * Tests more consistent * Test unsubscribe in the first test instead * should_filter takes the whole message instead of contents, make message.to_string const * Check for both destination possiblities, assert to check if none found * Fix all tests * Sanitize local variables and add should_filter_l return variable * Always use async_read with a cv wait instead of sleep * No early return * Cancel timer safely * Post review adjustments * Fix repeated tests failing due to ack_ready not being reset to false * release_assert * Legacy blocks are always filtered, fix asserts * Add tests for legacy blocks - without filtering options they are broadcasted, but with any filtering options they're not supported (always filtered) --- nano/core_test/websocket.cpp | 261 ++++++++++++++++++++++++++++++++--- nano/node/websocket.cpp | 64 ++++++++- nano/node/websocket.hpp | 58 ++++++-- 3 files changed, 353 insertions(+), 30 deletions(-) diff --git a/nano/core_test/websocket.cpp b/nano/core_test/websocket.cpp index e6e2b965..4062a947 100644 --- a/nano/core_test/websocket.cpp +++ b/nano/core_test/websocket.cpp @@ -1,9 +1,11 @@ +#include #include #include #include #include #include #include +#include #include #include #include @@ -20,10 +22,17 @@ using namespace std::chrono_literals; namespace { +/** This variable must be set to false before setting up every thread that makes a websocket test call (and needs ack), to be safe */ std::atomic ack_ready{ false }; -/** A simple blocking websocket client for testing */ -std::string websocket_test_call (boost::asio::io_context & ioc, std::string host, std::string port, std::string message_a, bool await_ack) + +/** An optionally blocking websocket client for testing */ +boost::optional websocket_test_call (boost::asio::io_context & ioc, std::string host, std::string port, std::string message_a, bool await_ack, bool await_response, seconds response_deadline = 5s) { + if (await_ack) + { + ack_ready = false; + } + boost::asio::ip::tcp::resolver resolver{ ioc }; boost::beast::websocket::stream ws{ ioc }; @@ -41,13 +50,46 @@ std::string websocket_test_call (boost::asio::io_context & ioc, std::string host ack_ready = true; } - boost::beast::flat_buffer buffer; - ws.read (buffer); - std::ostringstream res; - res << boost::beast::buffers (buffer.data ()); + boost::optional ret; - ws.close (boost::beast::websocket::close_code::normal); - return res.str (); + if (await_response) + { + boost::asio::deadline_timer timer (ioc); + std::atomic timed_out{ false }, got_response{ false }; + std::mutex cond_mutex; + std::condition_variable cond_var; + timer.expires_from_now (boost::posix_time::seconds (response_deadline.count ())); + timer.async_wait ([&ws, &cond_mutex, &cond_var, &timed_out](boost::system::error_code const & ec) { + if (!ec) + { + std::unique_lock lock (cond_mutex); + ws.next_layer ().cancel (); + timed_out = true; + cond_var.notify_one (); + } + }); + + boost::beast::flat_buffer buffer; + ws.async_read (buffer, [&ret, &buffer, &cond_mutex, &cond_var, &got_response](boost::beast::error_code const & ec, std::size_t const n) { + if (!ec) + { + std::unique_lock lock (cond_mutex); + std::ostringstream res; + res << boost::beast::buffers (buffer.data ()); + ret = res.str (); + got_response = true; + cond_var.notify_one (); + } + }); + std::unique_lock lock (cond_mutex); + cond_var.wait (lock, [&] { return timed_out || got_response; }); + if (got_response) + { + timer.cancel (); + ws.close (boost::beast::websocket::close_code::normal); + } + } + return ret; } } @@ -69,17 +111,18 @@ TEST (websocket, confirmation) system.nodes.push_back (node1); // Start websocket test-client in a separate thread + ack_ready = false; std::atomic confirmation_event_received{ false }; ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation)); std::thread client_thread ([&system, &confirmation_event_received]() { // This will expect two results: the acknowledgement of the subscription // and then the block confirmation message - std::string response = websocket_test_call (system.io_ctx, "::1", "24078", - R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true); - + auto response = websocket_test_call (system.io_ctx, "::1", "24078", + R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true, true); + ASSERT_TRUE (response); boost::property_tree::ptree event; std::stringstream stream; - stream << response; + stream << response.get (); boost::property_tree::read_json (stream, event); ASSERT_EQ (event.get ("topic"), "confirmation"); confirmation_event_received = true; @@ -92,21 +135,203 @@ TEST (websocket, confirmation) { ASSERT_NO_ERROR (system.poll ()); } + ack_ready = false; + ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation)); - // Quick-confirm a block nano::keypair key; - nano::block_hash previous (node1->latest (nano::test_genesis_key.pub)); - system.wallet (1)->insert_adhoc (key.prv); system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv); - auto send (std::make_shared (previous, key.pub, node1->config.online_weight_minimum.number () + 1, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous))); - node1->process_active (send); + auto balance = nano::genesis_amount; + auto send_amount = node1->config.online_weight_minimum.number () + 1; + // Quick-confirm a block, legacy blocks should work without filtering + { + nano::block_hash previous (node1->latest (nano::test_genesis_key.pub)); + balance -= send_amount; + auto send (std::make_shared (previous, key.pub, balance, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous))); + node1->process_active (send); + } - // Wait for the websocket client to receive the confirmation message + // Wait for the confirmation to be received system.deadline_set (5s); while (!confirmation_event_received) { ASSERT_NO_ERROR (system.poll ()); } + ack_ready = false; + + std::atomic unsubscribe_ack_received{ false }; + std::thread client_thread_2 ([&system, &unsubscribe_ack_received]() { + auto response = websocket_test_call (system.io_ctx, "::1", "24078", + R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true, true); + ASSERT_TRUE (response); + boost::property_tree::ptree event; + std::stringstream stream; + stream << response.get (); + boost::property_tree::read_json (stream, event); + ASSERT_EQ (event.get ("topic"), "confirmation"); + + // Unsubscribe action, expects an acknowledge but no response follows + websocket_test_call (system.io_ctx, "::1", "24078", + R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json", true, false); + unsubscribe_ack_received = true; + }); + client_thread_2.detach (); + + // Wait for the subscription to be acknowledged + system.deadline_set (5s); + while (!ack_ready) + { + ASSERT_NO_ERROR (system.poll ()); + } + ack_ready = false; + + // Quick confirm a state block + { + nano::block_hash previous (node1->latest (nano::test_genesis_key.pub)); + balance -= send_amount; + auto send (std::make_shared (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, balance, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous))); + node1->process_active (send); + } + + // Wait for the unsubscribe action to be acknowledged + system.deadline_set (5s); + while (!unsubscribe_ack_received) + { + ASSERT_NO_ERROR (system.poll ()); + } + ack_ready = false; + + node1->stop (); +} + +/** Tests the filtering options of block confirmations */ +TEST (websocket, confirmation_options) +{ + nano::system system (24000, 1); + nano::node_init init1; + nano::node_config config; + nano::node_flags node_flags; + config.websocket_config.enabled = true; + config.websocket_config.port = 24078; + + auto node1 (std::make_shared (init1, system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags)); + nano::uint256_union wallet; + nano::random_pool::generate_block (wallet.bytes.data (), wallet.bytes.size ()); + node1->wallets.create (wallet); + node1->start (); + system.nodes.push_back (node1); + + // Start websocket test-client in a separate thread + ack_ready = false; + std::atomic client_thread_finished{ false }; + ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation)); + std::thread client_thread ([&system, &client_thread_finished]() { + // Subscribe initially with a specific invalid account + auto response = websocket_test_call (system.io_ctx, "::1", "24078", + R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"accounts": ["xrb_invalid"]}})json", true, true, 1s); + + ASSERT_FALSE (response); + client_thread_finished = true; + }); + client_thread.detach (); + + // Wait for subscribe acknowledgement + system.deadline_set (5s); + while (!ack_ready) + { + ASSERT_NO_ERROR (system.poll ()); + } + ack_ready = false; + + // Confirm a state block for an in-wallet account + system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv); + nano::keypair key; + auto balance = nano::genesis_amount; + auto send_amount = node1->config.online_weight_minimum.number () + 1; + { + nano::block_hash previous (node1->latest (nano::test_genesis_key.pub)); + balance -= send_amount; + auto send (std::make_shared (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, balance, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous))); + node1->process_active (send); + } + + // Wait for client thread to finish, no confirmation message should be received with given filter + system.deadline_set (5s); + while (!client_thread_finished) + { + ASSERT_NO_ERROR (system.poll ()); + } + ack_ready = false; + + std::atomic client_thread_2_finished{ false }; + std::thread client_thread_2 ([&system, &client_thread_2_finished]() { + // Re-subscribe with options for all local wallet accounts + auto response = websocket_test_call (system.io_ctx, "::1", "24078", + R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"all_local_accounts": "true"}})json", true, true); + + ASSERT_TRUE (response); + boost::property_tree::ptree event; + std::stringstream stream; + stream << response.get (); + boost::property_tree::read_json (stream, event); + ASSERT_EQ (event.get ("topic"), "confirmation"); + + client_thread_2_finished = true; + }); + client_thread_2.detach (); + + // Wait for the subscribe action to be acknowledged + system.deadline_set (5s); + while (!ack_ready) + { + ASSERT_NO_ERROR (system.poll ()); + } + ack_ready = false; + + ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation)); + + // Quick-confirm another block + { + nano::block_hash previous (node1->latest (nano::test_genesis_key.pub)); + balance -= send_amount; + auto send (std::make_shared (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, balance, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous))); + node1->process_active (send); + } + + // Wait for confirmation message + system.deadline_set (5s); + while (!client_thread_2_finished) + { + ASSERT_NO_ERROR (system.poll ()); + } + ack_ready = false; + + std::atomic client_thread_3_finished{ false }; + std::thread client_thread_3 ([&system, &client_thread_3_finished]() { + auto response = websocket_test_call (system.io_ctx, "::1", "24078", + R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"all_local_accounts": "true"}})json", true, true, 1s); + + ASSERT_FALSE (response); + client_thread_3_finished = true; + }); + client_thread_3.detach (); + + // Confirm a legacy block + // When filtering options are enabled, legacy blocks are always filtered + { + nano::block_hash previous (node1->latest (nano::test_genesis_key.pub)); + balance -= send_amount; + auto send (std::make_shared (previous, key.pub, balance, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous))); + node1->process_active (send); + } + + // Wait for client thread to finish, no confirmation message should be received + system.deadline_set (5s); + while (!client_thread_3_finished) + { + ASSERT_NO_ERROR (system.poll ()); + } + ack_ready = false; + node1->stop (); } diff --git a/nano/node/websocket.cpp b/nano/node/websocket.cpp index 482b71d0..b99d06e4 100644 --- a/nano/node/websocket.cpp +++ b/nano/node/websocket.cpp @@ -4,6 +4,53 @@ #include #include +nano::websocket::confirmation_options::confirmation_options (boost::property_tree::ptree const & options_a, nano::node & node_a) : +all_local_accounts (options_a.get ("all_local_accounts", false)), +node (node_a) +{ + auto accounts_l (options_a.get_child_optional ("accounts")); + if (accounts_l) + { + for (auto account_l : *accounts_l) + { + // Check if the account is valid, but no error handling if it's not, simply not added to the filter + nano::account result_l (0); + if (!result_l.decode_account (account_l.second.data ())) + { + // Do not insert the given raw data to keep old prefix support + accounts.insert (result_l.to_account ()); + } + } + } +} + +bool nano::websocket::confirmation_options::should_filter (nano::websocket::message const & message_a) const +{ + bool should_filter_l (true); + auto destination_opt_l (message_a.contents.get_optional ("message.block.link_as_account")); + if (destination_opt_l) + { + auto source_text_l (message_a.contents.get ("message.account")); + if (all_local_accounts) + { + auto transaction_l (node.wallets.tx_begin_read ()); + nano::account source_l (0), destination_l (0); + auto decode_source_ok_l (!source_l.decode_account (source_text_l)); + auto decode_destination_ok_l (!destination_l.decode_account (destination_opt_l.get ())); + assert (decode_source_ok_l && decode_destination_ok_l); + if (node.wallets.exists (transaction_l, source_l) || node.wallets.exists (transaction_l, destination_l)) + { + should_filter_l = false; + } + } + if (accounts.find (source_text_l) != accounts.end () || accounts.find (destination_opt_l.get ()) != accounts.end ()) + { + should_filter_l = false; + } + } + return should_filter_l; +} + nano::websocket::session::session (nano::websocket::listener & listener_a, boost::asio::ip::tcp::socket socket_a) : ws_listener (listener_a), ws (std::move (socket_a)), write_strand (ws.get_executor ()) { @@ -17,7 +64,7 @@ nano::websocket::session::~session () std::unique_lock lk (subscriptions_mutex); for (auto & subscription : subscriptions) { - ws_listener.decrease_subscription_count (subscription); + ws_listener.decrease_subscription_count (subscription.first); } } @@ -54,7 +101,8 @@ void nano::websocket::session::write (nano::websocket::message message_a) { // clang-format off std::unique_lock lk (subscriptions_mutex); - if (message_a.topic == nano::websocket::topic::ack || subscriptions.find (message_a.topic) != subscriptions.end ()) + auto subscription (subscriptions.find (message_a.topic)); + if (message_a.topic == nano::websocket::topic::ack || (subscription != subscriptions.end () && !subscription->second->should_filter (message_a))) { lk.unlock (); boost::asio::post (write_strand, @@ -179,8 +227,16 @@ void nano::websocket::session::handle_message (boost::property_tree::ptree const auto action_succeeded (false); if (action == "subscribe" && topic_l != nano::websocket::topic::invalid) { + auto options_l (message_a.get_child_optional ("options")); std::lock_guard lk (subscriptions_mutex); - subscriptions.insert (topic_l); + if (topic_l == nano::websocket::topic::confirmation) + { + subscriptions.insert (std::make_pair (topic_l, options_l ? std::make_unique (options_l.get (), ws_listener.get_node ()) : std::make_unique ())); + } + else + { + subscriptions.insert (std::make_pair (topic_l, std::make_unique ())); + } ws_listener.increase_subscription_count (topic_l); action_succeeded = true; } @@ -331,7 +387,7 @@ nano::websocket::message nano::websocket::message_builder::block_confirmed (std: return msg; } -std::string nano::websocket::message::to_string () +std::string nano::websocket::message::to_string () const { std::ostringstream ostream; boost::property_tree::write_json (ostream, contents); diff --git a/nano/node/websocket.hpp b/nano/node/websocket.hpp index b523ccb7..b046e231 100644 --- a/nano/node/websocket.hpp +++ b/nano/node/websocket.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -54,7 +55,7 @@ namespace websocket { } - std::string to_string (); + std::string to_string () const; nano::websocket::topic topic; boost::property_tree::ptree contents; }; @@ -66,6 +67,50 @@ namespace websocket message block_confirmed (std::shared_ptr block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype); }; + /** Filtering options for subscriptions */ + class options + { + public: + /** + * Checks if a message should be filtered for default options (no options given). + * @param message_a the message to be checked + * @return false - the message should always be broadcasted + */ + virtual bool should_filter (message const & message_a) const + { + return false; + } + virtual ~options () = default; + }; + + /** + * Filtering options for block confirmation subscriptions + * Possible filtering options: + * * "all_local_accounts" (bool) - will only not filter blocks that have local wallet accounts as source/destination + * * "accounts" (array of std::strings) - will only not filter blocks that have these accounts as source/destination + * @remark Both options can be given, the resulting filter is an intersection of individual filters + * @remark No error is shown if any given account is invalid, the entry is simply ignored + * @warn Legacy blocks are always filtered (not broadcasted) + */ + class confirmation_options final : public options + { + public: + confirmation_options (); + confirmation_options (boost::property_tree::ptree const & options_a, nano::node & node_a); + + /** + * Checks if a message should be filtered for given block confirmation options. + * @param message_a the message to be checked + * @return false if the message should be broadcasted, true if it should be filtered + */ + bool should_filter (message const & message_a) const override; + + private: + nano::node & node; + bool all_local_accounts{ false }; + std::unordered_set accounts; + }; + /** A websocket session managing its own lifetime */ class session final : public std::enable_shared_from_this { @@ -109,11 +154,8 @@ namespace websocket return static_cast (t); } }; - /** - * Set of subscriptions registered by this session. In the future, contextual information - * can be added to subscription objects, such as which accounts to get confirmations for. - */ - std::unordered_set subscriptions; + /** Map of subscriptions -> options registered by this session. */ + std::unordered_map, topic_hash> subscriptions; std::mutex subscriptions_mutex; /** Handle incoming message */ @@ -147,8 +189,8 @@ namespace websocket } /** - * Per-topic subscribers check. Relies on all sessions correctly increasing and - * decreasing the subscriber counts themselves. + * Per-topic subscribers check. Relies on all sessions correctly increasing and + * decreasing the subscriber counts themselves. */ bool any_subscribers (nano::websocket::topic const & topic_a); /** Adds to subscription count of a specific topic*/