Websockets - filtering options (#1907)

* Add subscription filters for accounts in wallets or a custom list of accounts

* Small syntax

* Another small leftover

* Change to a set of strings, add an assert

* Inverse logic for decode_account assert

* Remove virtual semantics from the final classes, add documentation

* Remove nano::node passing through functions, add as a member in options that need it

* Check destination account as well

* Change to should_filter

* Add test for confirmation options

* Check for subscribers

* Tests more consistent

* Test unsubscribe in the first test instead

* should_filter takes the whole message instead of contents, make message.to_string const

* Check for both destination possiblities, assert to check if none found

* Fix all tests

* Sanitize local variables and add should_filter_l return variable

* Always use async_read with a cv wait instead of sleep

* No early return

* Cancel timer safely

* Post review adjustments

* Fix repeated tests failing due to ack_ready not being reset to false

* release_assert

* Legacy blocks are always filtered, fix asserts

* Add tests for legacy blocks - without filtering options they are broadcasted, but with any filtering options they're not supported (always filtered)
This commit is contained in:
Guilherme Lawless 2019-04-18 11:55:24 +01:00 committed by cryptocode
commit 9a9fbcaa70
3 changed files with 353 additions and 30 deletions

View file

@ -1,9 +1,11 @@
#include <boost/asio.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <chrono>
#include <condition_variable>
#include <cstdlib>
#include <gtest/gtest.h>
#include <iostream>
@ -20,10 +22,17 @@ using namespace std::chrono_literals;
namespace
{
/** This variable must be set to false before setting up every thread that makes a websocket test call (and needs ack), to be safe */
std::atomic<bool> ack_ready{ false };
/** A simple blocking websocket client for testing */
std::string websocket_test_call (boost::asio::io_context & ioc, std::string host, std::string port, std::string message_a, bool await_ack)
/** An optionally blocking websocket client for testing */
boost::optional<std::string> websocket_test_call (boost::asio::io_context & ioc, std::string host, std::string port, std::string message_a, bool await_ack, bool await_response, seconds response_deadline = 5s)
{
if (await_ack)
{
ack_ready = false;
}
boost::asio::ip::tcp::resolver resolver{ ioc };
boost::beast::websocket::stream<boost::asio::ip::tcp::socket> ws{ ioc };
@ -41,13 +50,46 @@ std::string websocket_test_call (boost::asio::io_context & ioc, std::string host
ack_ready = true;
}
boost::beast::flat_buffer buffer;
ws.read (buffer);
std::ostringstream res;
res << boost::beast::buffers (buffer.data ());
boost::optional<std::string> ret;
ws.close (boost::beast::websocket::close_code::normal);
return res.str ();
if (await_response)
{
boost::asio::deadline_timer timer (ioc);
std::atomic<bool> timed_out{ false }, got_response{ false };
std::mutex cond_mutex;
std::condition_variable cond_var;
timer.expires_from_now (boost::posix_time::seconds (response_deadline.count ()));
timer.async_wait ([&ws, &cond_mutex, &cond_var, &timed_out](boost::system::error_code const & ec) {
if (!ec)
{
std::unique_lock<std::mutex> lock (cond_mutex);
ws.next_layer ().cancel ();
timed_out = true;
cond_var.notify_one ();
}
});
boost::beast::flat_buffer buffer;
ws.async_read (buffer, [&ret, &buffer, &cond_mutex, &cond_var, &got_response](boost::beast::error_code const & ec, std::size_t const n) {
if (!ec)
{
std::unique_lock<std::mutex> lock (cond_mutex);
std::ostringstream res;
res << boost::beast::buffers (buffer.data ());
ret = res.str ();
got_response = true;
cond_var.notify_one ();
}
});
std::unique_lock<std::mutex> lock (cond_mutex);
cond_var.wait (lock, [&] { return timed_out || got_response; });
if (got_response)
{
timer.cancel ();
ws.close (boost::beast::websocket::close_code::normal);
}
}
return ret;
}
}
@ -69,17 +111,18 @@ TEST (websocket, confirmation)
system.nodes.push_back (node1);
// Start websocket test-client in a separate thread
ack_ready = false;
std::atomic<bool> confirmation_event_received{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation));
std::thread client_thread ([&system, &confirmation_event_received]() {
// This will expect two results: the acknowledgement of the subscription
// and then the block confirmation message
std::string response = websocket_test_call (system.io_ctx, "::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true);
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true, true);
ASSERT_TRUE (response);
boost::property_tree::ptree event;
std::stringstream stream;
stream << response;
stream << response.get ();
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "confirmation");
confirmation_event_received = true;
@ -92,21 +135,203 @@ TEST (websocket, confirmation)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation));
// Quick-confirm a block
nano::keypair key;
nano::block_hash previous (node1->latest (nano::test_genesis_key.pub));
system.wallet (1)->insert_adhoc (key.prv);
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
auto send (std::make_shared<nano::send_block> (previous, key.pub, node1->config.online_weight_minimum.number () + 1, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous)));
node1->process_active (send);
auto balance = nano::genesis_amount;
auto send_amount = node1->config.online_weight_minimum.number () + 1;
// Quick-confirm a block, legacy blocks should work without filtering
{
nano::block_hash previous (node1->latest (nano::test_genesis_key.pub));
balance -= send_amount;
auto send (std::make_shared<nano::send_block> (previous, key.pub, balance, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous)));
node1->process_active (send);
}
// Wait for the websocket client to receive the confirmation message
// Wait for the confirmation to be received
system.deadline_set (5s);
while (!confirmation_event_received)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
std::atomic<bool> unsubscribe_ack_received{ false };
std::thread client_thread_2 ([&system, &unsubscribe_ack_received]() {
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true, true);
ASSERT_TRUE (response);
boost::property_tree::ptree event;
std::stringstream stream;
stream << response.get ();
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "confirmation");
// Unsubscribe action, expects an acknowledge but no response follows
websocket_test_call (system.io_ctx, "::1", "24078",
R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json", true, false);
unsubscribe_ack_received = true;
});
client_thread_2.detach ();
// Wait for the subscription to be acknowledged
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
// Quick confirm a state block
{
nano::block_hash previous (node1->latest (nano::test_genesis_key.pub));
balance -= send_amount;
auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, balance, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous)));
node1->process_active (send);
}
// Wait for the unsubscribe action to be acknowledged
system.deadline_set (5s);
while (!unsubscribe_ack_received)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
node1->stop ();
}
/** Tests the filtering options of block confirmations */
TEST (websocket, confirmation_options)
{
nano::system system (24000, 1);
nano::node_init init1;
nano::node_config config;
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = 24078;
auto node1 (std::make_shared<nano::node> (init1, system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
nano::uint256_union wallet;
nano::random_pool::generate_block (wallet.bytes.data (), wallet.bytes.size ());
node1->wallets.create (wallet);
node1->start ();
system.nodes.push_back (node1);
// Start websocket test-client in a separate thread
ack_ready = false;
std::atomic<bool> client_thread_finished{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation));
std::thread client_thread ([&system, &client_thread_finished]() {
// Subscribe initially with a specific invalid account
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"accounts": ["xrb_invalid"]}})json", true, true, 1s);
ASSERT_FALSE (response);
client_thread_finished = true;
});
client_thread.detach ();
// Wait for subscribe acknowledgement
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
// Confirm a state block for an in-wallet account
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
nano::keypair key;
auto balance = nano::genesis_amount;
auto send_amount = node1->config.online_weight_minimum.number () + 1;
{
nano::block_hash previous (node1->latest (nano::test_genesis_key.pub));
balance -= send_amount;
auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, balance, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous)));
node1->process_active (send);
}
// Wait for client thread to finish, no confirmation message should be received with given filter
system.deadline_set (5s);
while (!client_thread_finished)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
std::atomic<bool> client_thread_2_finished{ false };
std::thread client_thread_2 ([&system, &client_thread_2_finished]() {
// Re-subscribe with options for all local wallet accounts
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"all_local_accounts": "true"}})json", true, true);
ASSERT_TRUE (response);
boost::property_tree::ptree event;
std::stringstream stream;
stream << response.get ();
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "confirmation");
client_thread_2_finished = true;
});
client_thread_2.detach ();
// Wait for the subscribe action to be acknowledged
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation));
// Quick-confirm another block
{
nano::block_hash previous (node1->latest (nano::test_genesis_key.pub));
balance -= send_amount;
auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, balance, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous)));
node1->process_active (send);
}
// Wait for confirmation message
system.deadline_set (5s);
while (!client_thread_2_finished)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
std::atomic<bool> client_thread_3_finished{ false };
std::thread client_thread_3 ([&system, &client_thread_3_finished]() {
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"all_local_accounts": "true"}})json", true, true, 1s);
ASSERT_FALSE (response);
client_thread_3_finished = true;
});
client_thread_3.detach ();
// Confirm a legacy block
// When filtering options are enabled, legacy blocks are always filtered
{
nano::block_hash previous (node1->latest (nano::test_genesis_key.pub));
balance -= send_amount;
auto send (std::make_shared<nano::send_block> (previous, key.pub, balance, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous)));
node1->process_active (send);
}
// Wait for client thread to finish, no confirmation message should be received
system.deadline_set (5s);
while (!client_thread_3_finished)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
node1->stop ();
}

View file

@ -4,6 +4,53 @@
#include <nano/node/node.hpp>
#include <nano/node/websocket.hpp>
nano::websocket::confirmation_options::confirmation_options (boost::property_tree::ptree const & options_a, nano::node & node_a) :
all_local_accounts (options_a.get<bool> ("all_local_accounts", false)),
node (node_a)
{
auto accounts_l (options_a.get_child_optional ("accounts"));
if (accounts_l)
{
for (auto account_l : *accounts_l)
{
// Check if the account is valid, but no error handling if it's not, simply not added to the filter
nano::account result_l (0);
if (!result_l.decode_account (account_l.second.data ()))
{
// Do not insert the given raw data to keep old prefix support
accounts.insert (result_l.to_account ());
}
}
}
}
bool nano::websocket::confirmation_options::should_filter (nano::websocket::message const & message_a) const
{
bool should_filter_l (true);
auto destination_opt_l (message_a.contents.get_optional<std::string> ("message.block.link_as_account"));
if (destination_opt_l)
{
auto source_text_l (message_a.contents.get<std::string> ("message.account"));
if (all_local_accounts)
{
auto transaction_l (node.wallets.tx_begin_read ());
nano::account source_l (0), destination_l (0);
auto decode_source_ok_l (!source_l.decode_account (source_text_l));
auto decode_destination_ok_l (!destination_l.decode_account (destination_opt_l.get ()));
assert (decode_source_ok_l && decode_destination_ok_l);
if (node.wallets.exists (transaction_l, source_l) || node.wallets.exists (transaction_l, destination_l))
{
should_filter_l = false;
}
}
if (accounts.find (source_text_l) != accounts.end () || accounts.find (destination_opt_l.get ()) != accounts.end ())
{
should_filter_l = false;
}
}
return should_filter_l;
}
nano::websocket::session::session (nano::websocket::listener & listener_a, boost::asio::ip::tcp::socket socket_a) :
ws_listener (listener_a), ws (std::move (socket_a)), write_strand (ws.get_executor ())
{
@ -17,7 +64,7 @@ nano::websocket::session::~session ()
std::unique_lock<std::mutex> lk (subscriptions_mutex);
for (auto & subscription : subscriptions)
{
ws_listener.decrease_subscription_count (subscription);
ws_listener.decrease_subscription_count (subscription.first);
}
}
@ -54,7 +101,8 @@ void nano::websocket::session::write (nano::websocket::message message_a)
{
// clang-format off
std::unique_lock<std::mutex> lk (subscriptions_mutex);
if (message_a.topic == nano::websocket::topic::ack || subscriptions.find (message_a.topic) != subscriptions.end ())
auto subscription (subscriptions.find (message_a.topic));
if (message_a.topic == nano::websocket::topic::ack || (subscription != subscriptions.end () && !subscription->second->should_filter (message_a)))
{
lk.unlock ();
boost::asio::post (write_strand,
@ -179,8 +227,16 @@ void nano::websocket::session::handle_message (boost::property_tree::ptree const
auto action_succeeded (false);
if (action == "subscribe" && topic_l != nano::websocket::topic::invalid)
{
auto options_l (message_a.get_child_optional ("options"));
std::lock_guard<std::mutex> lk (subscriptions_mutex);
subscriptions.insert (topic_l);
if (topic_l == nano::websocket::topic::confirmation)
{
subscriptions.insert (std::make_pair (topic_l, options_l ? std::make_unique<nano::websocket::confirmation_options> (options_l.get (), ws_listener.get_node ()) : std::make_unique<nano::websocket::options> ()));
}
else
{
subscriptions.insert (std::make_pair (topic_l, std::make_unique<nano::websocket::options> ()));
}
ws_listener.increase_subscription_count (topic_l);
action_succeeded = true;
}
@ -331,7 +387,7 @@ nano::websocket::message nano::websocket::message_builder::block_confirmed (std:
return msg;
}
std::string nano::websocket::message::to_string ()
std::string nano::websocket::message::to_string () const
{
std::ostringstream ostream;
boost::property_tree::write_json (ostream, contents);

View file

@ -18,6 +18,7 @@
#include <nano/lib/numbers.hpp>
#include <string>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <vector>
@ -54,7 +55,7 @@ namespace websocket
{
}
std::string to_string ();
std::string to_string () const;
nano::websocket::topic topic;
boost::property_tree::ptree contents;
};
@ -66,6 +67,50 @@ namespace websocket
message block_confirmed (std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype);
};
/** Filtering options for subscriptions */
class options
{
public:
/**
* Checks if a message should be filtered for default options (no options given).
* @param message_a the message to be checked
* @return false - the message should always be broadcasted
*/
virtual bool should_filter (message const & message_a) const
{
return false;
}
virtual ~options () = default;
};
/**
* Filtering options for block confirmation subscriptions
* Possible filtering options:
* * "all_local_accounts" (bool) - will only not filter blocks that have local wallet accounts as source/destination
* * "accounts" (array of std::strings) - will only not filter blocks that have these accounts as source/destination
* @remark Both options can be given, the resulting filter is an intersection of individual filters
* @remark No error is shown if any given account is invalid, the entry is simply ignored
* @warn Legacy blocks are always filtered (not broadcasted)
*/
class confirmation_options final : public options
{
public:
confirmation_options ();
confirmation_options (boost::property_tree::ptree const & options_a, nano::node & node_a);
/**
* Checks if a message should be filtered for given block confirmation options.
* @param message_a the message to be checked
* @return false if the message should be broadcasted, true if it should be filtered
*/
bool should_filter (message const & message_a) const override;
private:
nano::node & node;
bool all_local_accounts{ false };
std::unordered_set<std::string> accounts;
};
/** A websocket session managing its own lifetime */
class session final : public std::enable_shared_from_this<session>
{
@ -109,11 +154,8 @@ namespace websocket
return static_cast<std::size_t> (t);
}
};
/**
* Set of subscriptions registered by this session. In the future, contextual information
* can be added to subscription objects, such as which accounts to get confirmations for.
*/
std::unordered_set<topic, topic_hash> subscriptions;
/** Map of subscriptions -> options registered by this session. */
std::unordered_map<topic, std::unique_ptr<options>, topic_hash> subscriptions;
std::mutex subscriptions_mutex;
/** Handle incoming message */
@ -147,8 +189,8 @@ namespace websocket
}
/**
* Per-topic subscribers check. Relies on all sessions correctly increasing and
* decreasing the subscriber counts themselves.
* Per-topic subscribers check. Relies on all sessions correctly increasing and
* decreasing the subscriber counts themselves.
*/
bool any_subscribers (nano::websocket::topic const & topic_a);
/** Adds to subscription count of a specific topic*/