Websockets - subscribe to votes (#1908)

* Add a vote subscription to websockets

* Add vote subscription options to filter by representatives

* Log when invalid accounts are given for filtering (confirmation & vote)

* Filter only empty if both options are empty
This commit is contained in:
Guilherme Lawless 2019-04-18 21:56:58 +01:00 committed by cryptocode
commit 1e09b14cfb
4 changed files with 300 additions and 19 deletions

View file

@ -335,3 +335,170 @@ TEST (websocket, confirmation_options)
node1->stop ();
}
/** Subscribes to votes, sends a block and awaits websocket notification of a vote arrival */
TEST (websocket, vote)
{
nano::system system (24000, 1);
nano::node_init init1;
nano::node_config config;
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = 24078;
auto node1 (std::make_shared<nano::node> (init1, system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
nano::uint256_union wallet;
nano::random_pool::generate_block (wallet.bytes.data (), wallet.bytes.size ());
node1->wallets.create (wallet);
node1->start ();
system.nodes.push_back (node1);
// Start websocket test-client in a separate thread
ack_ready = false;
std::atomic<bool> client_thread_finished{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));
std::thread client_thread ([&system, &client_thread_finished]() {
// This will expect two results: the acknowledgement of the subscription
// and then the vote message
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
R"json({"action": "subscribe", "topic": "vote", "ack": true})json", true, true);
ASSERT_TRUE (response);
boost::property_tree::ptree event;
std::stringstream stream;
stream << response;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "vote");
client_thread_finished = true;
});
client_thread.detach ();
// Wait for the subscription to be acknowledged
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));
// Quick-confirm a block
nano::keypair key;
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
nano::block_hash previous (node1->latest (nano::test_genesis_key.pub));
auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, nano::genesis_amount - (node1->config.online_weight_minimum.number () + 1), key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous)));
node1->process_active (send);
// Wait for the websocket client to receive the vote message
system.deadline_set (5s);
while (!client_thread_finished)
{
ASSERT_NO_ERROR (system.poll ());
}
node1->stop ();
}
/** Tests vote subscription options */
TEST (websocket, vote_options)
{
nano::system system (24000, 1);
nano::node_init init1;
nano::node_config config;
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = 24078;
auto node1 (std::make_shared<nano::node> (init1, system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
nano::uint256_union wallet;
nano::random_pool::generate_block (wallet.bytes.data (), wallet.bytes.size ());
node1->wallets.create (wallet);
node1->start ();
system.nodes.push_back (node1);
// Start websocket test-client in a separate thread
ack_ready = false;
std::atomic<bool> client_thread_finished{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));
std::thread client_thread ([&system, &client_thread_finished]() {
std::ostringstream data;
data << R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"representatives": [")json"
<< nano::test_genesis_key.pub.to_account ()
<< R"json("]}})json";
auto response = websocket_test_call (system.io_ctx, "::1", "24078", data.str (), true, true);
ASSERT_TRUE (response);
boost::property_tree::ptree event;
std::stringstream stream;
stream << response;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "vote");
client_thread_finished = true;
});
client_thread.detach ();
// Wait for the subscription to be acknowledged
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));
// Quick-confirm a block
nano::keypair key;
auto balance = nano::genesis_amount;
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
auto send_amount = node1->config.online_weight_minimum.number () + 1;
auto confirm_block = [&]() {
nano::block_hash previous (node1->latest (nano::test_genesis_key.pub));
balance -= send_amount;
auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, balance, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous)));
node1->process_active (send);
};
confirm_block ();
// Wait for the websocket client to receive the vote message
system.deadline_set (5s);
while (!client_thread_finished)
{
ASSERT_NO_ERROR (system.poll ());
}
std::atomic<bool> client_thread_2_finished{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));
std::thread client_thread_2 ([&system, &client_thread_2_finished]() {
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"representatives": ["xrb_invalid"]}})json", true, true, 1s);
// No response expected given the filter
ASSERT_FALSE (response);
client_thread_2_finished = true;
});
client_thread_2.detach ();
// Wait for the subscription to be acknowledged
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));
// Confirm another block
confirm_block ();
// No response expected
system.deadline_set (5s);
while (!client_thread_2_finished)
{
ASSERT_NO_ERROR (system.poll ());
}
node1->stop ();
}

View file

@ -1220,6 +1220,17 @@ startup_time (std::chrono::steady_clock::now ())
}
}
});
if (this->websocket_server)
{
observers.vote.add ([this](nano::transaction const & transaction, std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> channel_a) {
if (this->websocket_server->any_subscribers (nano::websocket::topic::vote))
{
nano::websocket::message_builder builder;
auto msg (builder.vote_received (vote_a));
this->websocket_server->broadcast (msg);
}
});
}
if (NANO_VERSION_PATCH == 0)
{
logger.always_log ("Node starting, version: ", NANO_MAJOR_MINOR_VERSION);

View file

@ -13,15 +13,23 @@ node (node_a)
{
for (auto account_l : *accounts_l)
{
// Check if the account is valid, but no error handling if it's not, simply not added to the filter
nano::account result_l (0);
if (!result_l.decode_account (account_l.second.data ()))
{
// Do not insert the given raw data to keep old prefix support
accounts.insert (result_l.to_account ());
}
else
{
node.logger.always_log (boost::str (boost::format ("Websocket: invalid account provided for filtering blocks: %1%") % account_l.second.data ()));
}
}
}
// Warn the user if the options resulted in an empty filter
if (!all_local_accounts && accounts.empty ())
{
node.logger.always_log ("Websocket: provided options resulted in an empty block confirmation filter");
}
}
bool nano::websocket::confirmation_options::should_filter (nano::websocket::message const & message_a) const
@ -51,11 +59,49 @@ bool nano::websocket::confirmation_options::should_filter (nano::websocket::mess
return should_filter_l;
}
nano::websocket::vote_options::vote_options (boost::property_tree::ptree const & options_a, nano::node & node_a) :
node (node_a)
{
auto representatives_l (options_a.get_child_optional ("representatives"));
if (representatives_l)
{
for (auto representative_l : *representatives_l)
{
nano::account result_l (0);
if (!result_l.decode_account (representative_l.second.data ()))
{
// Do not insert the given raw data to keep old prefix support
representatives.insert (result_l.to_account ());
}
else
{
node.logger.always_log (boost::str (boost::format ("Websocket: invalid account given to filter votes: %1%") % representative_l.second.data ()));
}
}
}
// Warn the user if the options resulted in an empty filter
if (representatives.empty ())
{
node.logger.always_log ("Websocket: provided options resulted in an empty vote filter");
}
}
bool nano::websocket::vote_options::should_filter (nano::websocket::message const & message_a) const
{
bool should_filter_l (true);
auto representative_text_l (message_a.contents.get<std::string> ("message.account"));
if (representatives.find (representative_text_l) != representatives.end ())
{
should_filter_l = false;
}
return should_filter_l;
}
nano::websocket::session::session (nano::websocket::listener & listener_a, boost::asio::ip::tcp::socket socket_a) :
ws_listener (listener_a), ws (std::move (socket_a)), write_strand (ws.get_executor ())
{
ws.text (true);
ws_listener.get_node ().logger.try_log ("websocket session started");
ws_listener.get_node ().logger.try_log ("Websocket: session started");
}
nano::websocket::session::~session ()
@ -68,7 +114,7 @@ nano::websocket::session::~session ()
}
}
ws_listener.get_node ().logger.try_log ("websocket session ended");
ws_listener.get_node ().logger.try_log ("Websocket: session ended");
}
void nano::websocket::session::handshake ()
@ -82,7 +128,7 @@ void nano::websocket::session::handshake ()
}
else
{
self_l->ws_listener.get_node ().logger.always_log ("websocket handshake failed: ", ec.message ());
self_l->ws_listener.get_node ().logger.always_log ("Websocket: handshake failed: ", ec.message ());
}
});
}
@ -163,12 +209,12 @@ void nano::websocket::session::read ()
}
catch (boost::property_tree::json_parser::json_parser_error const & ex)
{
self_l->ws_listener.get_node ().logger.try_log ("websocket json parsing failed: ", ex.what ());
self_l->ws_listener.get_node ().logger.try_log ("Websocket: json parsing failed: ", ex.what ());
}
}
else
{
self_l->ws_listener.get_node ().logger.try_log ("websocket read failed: ", ec.message ());
self_l->ws_listener.get_node ().logger.try_log ("Websocket: read failed: ", ec.message ());
}
});
}
@ -182,6 +228,10 @@ nano::websocket::topic to_topic (std::string topic_a)
{
topic = nano::websocket::topic::confirmation;
}
else if (topic_a == "vote")
{
topic = nano::websocket::topic::vote;
}
else if (topic_a == "ack")
{
topic = nano::websocket::topic::ack;
@ -196,6 +246,10 @@ std::string from_topic (nano::websocket::topic topic_a)
{
topic = "confirmation";
}
else if (topic_a == nano::websocket::topic::vote)
{
topic = "vote";
}
else if (topic_a == nano::websocket::topic::ack)
{
topic = "ack";
@ -233,6 +287,10 @@ void nano::websocket::session::handle_message (boost::property_tree::ptree const
{
subscriptions.insert (std::make_pair (topic_l, options_l ? std::make_unique<nano::websocket::confirmation_options> (options_l.get (), ws_listener.get_node ()) : std::make_unique<nano::websocket::options> ()));
}
else if (topic_l == nano::websocket::topic::vote)
{
subscriptions.insert (std::make_pair (topic_l, options_l ? std::make_unique<nano::websocket::vote_options> (options_l.get (), ws_listener.get_node ()) : std::make_unique<nano::websocket::options> ()));
}
else
{
subscriptions.insert (std::make_pair (topic_l, std::make_unique<nano::websocket::options> ()));
@ -284,7 +342,7 @@ socket (node_a.io_ctx)
}
catch (std::exception const & ex)
{
node.logger.always_log ("websocket listen failed: ", ex.what ());
node.logger.always_log ("Websocket: listen failed: ", ex.what ());
}
}
@ -308,7 +366,7 @@ void nano::websocket::listener::on_accept (boost::system::error_code ec)
{
if (ec)
{
node.logger.always_log ("websocket accept failed: ", ec.message ());
node.logger.always_log ("Websocket: accept failed: ", ec.message ());
}
else
{
@ -361,14 +419,8 @@ void nano::websocket::listener::decrease_subscription_count (nano::websocket::to
nano::websocket::message nano::websocket::message_builder::block_confirmed (std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype)
{
nano::websocket::message msg (nano::websocket::topic::confirmation);
using namespace std::chrono;
auto milli_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now ().time_since_epoch ()).count ();
// Common message information
boost::property_tree::ptree & message_l = msg.contents;
message_l.add ("topic", from_topic (msg.topic));
message_l.add ("time", std::to_string (milli_since_epoch));
nano::websocket::message message_l (nano::websocket::topic::confirmation);
set_common_fields (message_l);
// Block confirmation properties
boost::property_tree::ptree message_node_l;
@ -382,9 +434,31 @@ nano::websocket::message nano::websocket::message_builder::block_confirmed (std:
block_node_l.add ("subtype", subtype);
}
message_node_l.add_child ("block", block_node_l);
message_l.add_child ("message", message_node_l);
message_l.contents.add_child ("message", message_node_l);
return msg;
return message_l;
}
nano::websocket::message nano::websocket::message_builder::vote_received (std::shared_ptr<nano::vote> vote_a)
{
nano::websocket::message message_l (nano::websocket::topic::vote);
set_common_fields (message_l);
// Vote information
boost::property_tree::ptree vote_node_l;
vote_a->serialize_json (vote_node_l);
message_l.contents.add_child ("message", vote_node_l);
return message_l;
}
void nano::websocket::message_builder::set_common_fields (nano::websocket::message & message_a)
{
using namespace std::chrono;
auto milli_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now ().time_since_epoch ()).count ();
// Common message information
message_a.contents.add ("topic", from_topic (message_a.topic));
message_a.contents.add ("time", std::to_string (milli_since_epoch));
}
std::string nano::websocket::message::to_string () const

View file

@ -37,6 +37,8 @@ namespace websocket
ack,
/** A confirmation message */
confirmation,
/** A vote message **/
vote,
/** Auxiliary length, not a valid topic, must be the last enum */
_length
};
@ -65,6 +67,11 @@ namespace websocket
{
public:
message block_confirmed (std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype);
message vote_received (std::shared_ptr<nano::vote> vote_a);
private:
/** Set the common fields for messages: timestamp and topic. */
void set_common_fields (message & message_a);
};
/** Filtering options for subscriptions */
@ -89,7 +96,6 @@ namespace websocket
* * "all_local_accounts" (bool) - will only not filter blocks that have local wallet accounts as source/destination
* * "accounts" (array of std::strings) - will only not filter blocks that have these accounts as source/destination
* @remark Both options can be given, the resulting filter is an intersection of individual filters
* @remark No error is shown if any given account is invalid, the entry is simply ignored
* @warn Legacy blocks are always filtered (not broadcasted)
*/
class confirmation_options final : public options
@ -111,6 +117,29 @@ namespace websocket
std::unordered_set<std::string> accounts;
};
/**
* Filtering options for vote subscriptions
* Possible filtering options:
* * "representatives" (array of std::strings) - will only broadcast votes from these representatives
*/
class vote_options final : public options
{
public:
vote_options ();
vote_options (boost::property_tree::ptree const & options_a, nano::node & node_a);
/**
* Checks if a message should be filtered for given vote received options.
* @param message_a the message to be checked
* @return false if the message should be broadcasted, true if it should be filtered
*/
bool should_filter (message const & message_a) const override;
private:
nano::node & node;
std::unordered_set<std::string> representatives;
};
/** A websocket session managing its own lifetime */
class session final : public std::enable_shared_from_this<session>
{