Add a fake websocket client to test the node websocket server (#2562)

This client keeps connections alive and runs in a separate thread
This commit is contained in:
Guilherme Lawless 2020-02-14 16:35:50 +00:00 committed by GitHub
commit dd14ea13aa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 321 additions and 415 deletions

View file

@ -1,6 +1,7 @@
add_executable (core_test
core_test_main.cc
testutil.hpp
fakes/websocket_client.hpp
fakes/work_peer.hpp
active_transactions.cpp
block.cpp

View file

@ -0,0 +1,74 @@
#pragma once
#include <nano/boost/asio/connect.hpp>
#include <nano/boost/asio/ip/tcp.hpp>
#include <nano/boost/beast/core.hpp>
#include <nano/boost/beast/websocket.hpp>
#include <nano/node/websocket.hpp>
#include <chrono>
using namespace std::chrono_literals;
namespace
{
// Creates its own io context
class fake_websocket_client
{
public:
fake_websocket_client (unsigned port) :
socket (std::make_shared<boost::beast::websocket::stream<boost::asio::ip::tcp::socket>> (ioc))
{
std::string const host = "::1";
boost::asio::ip::tcp::resolver resolver{ ioc };
auto const results = resolver.resolve (host, std::to_string (port));
boost::asio::connect (socket->next_layer (), results.begin (), results.end ());
socket->handshake (host, "/");
socket->text (true);
}
~fake_websocket_client ()
{
if (socket->is_open ())
{
socket->async_close (boost::beast::websocket::close_code::normal, [socket = this->socket](boost::beast::error_code const & ec) {
// A synchronous close usually hangs in tests when the server's io_context stops looping
// An async_close solves this problem
});
}
}
void send_message (std::string const & message_a)
{
socket->write (boost::asio::buffer (message_a));
}
void await_ack ()
{
assert (socket->is_open ());
boost::beast::flat_buffer buffer;
socket->read (buffer);
}
boost::optional<std::string> get_response (std::chrono::seconds const deadline = 5s)
{
assert (deadline > 0s);
boost::optional<std::string> result;
auto buffer (std::make_shared<boost::beast::flat_buffer> ());
socket->async_read (*buffer, [&result, &buffer, socket = this->socket](boost::beast::error_code const & ec, std::size_t const /*n*/) {
if (!ec)
{
std::ostringstream res;
res << beast_buffers (buffer->data ());
result = res.str ();
}
});
ioc.run_one_for (deadline);
return result;
}
private:
boost::asio::io_context ioc;
std::shared_ptr<boost::beast::websocket::stream<boost::asio::ip::tcp::socket>> socket;
};
}

View file

@ -1,7 +1,4 @@
#include <nano/boost/asio/connect.hpp>
#include <nano/boost/asio/ip/tcp.hpp>
#include <nano/boost/beast/core.hpp>
#include <nano/boost/beast/websocket.hpp>
#include <nano/core_test/fakes/websocket_client.hpp>
#include <nano/core_test/testutil.hpp>
#include <nano/node/testing.hpp>
#include <nano/node/websocket.hpp>
@ -19,65 +16,7 @@
using namespace std::chrono_literals;
namespace
{
/** This variable must be set to false before setting up every thread that makes a websocket test call (and needs ack), to be safe */
std::atomic<bool> ack_ready{ false };
/** An optionally blocking websocket client for testing */
boost::optional<std::string> websocket_test_call (std::string host, std::string port, std::string message_a, bool await_ack, bool await_response, const std::chrono::seconds response_deadline = 5s)
{
if (await_ack)
{
ack_ready = false;
}
boost::optional<std::string> ret;
boost::asio::io_context ioc;
boost::asio::ip::tcp::resolver resolver{ ioc };
auto ws (std::make_shared<boost::beast::websocket::stream<boost::asio::ip::tcp::socket>> (ioc));
auto const results = resolver.resolve (host, port);
boost::asio::connect (ws->next_layer (), results.begin (), results.end ());
ws->handshake (host, "/");
ws->text (true);
ws->write (boost::asio::buffer (message_a));
if (await_ack)
{
boost::beast::flat_buffer buffer;
ws->read (buffer);
ack_ready = true;
}
if (await_response)
{
assert (response_deadline > 0s);
auto buffer (std::make_shared<boost::beast::flat_buffer> ());
ws->async_read (*buffer, [&ret, ws, buffer](boost::beast::error_code const & ec, std::size_t const n) {
if (!ec)
{
std::ostringstream res;
res << beast_buffers (buffer->data ());
ret = res.str ();
}
});
ioc.run_one_for (response_deadline);
}
if (ws->is_open ())
{
ws->async_close (boost::beast::websocket::close_code::normal, [ws](boost::beast::error_code const & ec) {
// A synchronous close usually hangs in tests when the server's io_context stops looping
// An async_close solves this problem
});
}
return ret;
}
}
/** Tests clients subscribing multiple times or unsubscribing without a subscription */
// Tests clients subscribing multiple times or unsubscribing without a subscription
TEST (websocket, subscription_edge)
{
nano::system system;
@ -88,67 +27,28 @@ TEST (websocket, subscription_edge)
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
// First subscription
{
ack_ready = false;
std::thread subscription_thread ([config]() {
websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true, false);
auto task = ([config, &node1]() {
fake_websocket_client client (config.websocket_config.port);
client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json");
client.await_ack ();
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json");
client.await_ack ();
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
client.send_message (R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json");
client.await_ack ();
EXPECT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
client.send_message (R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json");
client.await_ack ();
EXPECT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
});
auto future = std::async (std::launch::async, task);
system.deadline_set (5s);
while (!ack_ready)
while (future.wait_for (0s) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}
subscription_thread.join ();
ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
}
// Second subscription, should not increase subscriber count, only update the subscription
{
ack_ready = false;
std::thread subscription_thread ([config]() {
websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true, false);
});
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
subscription_thread.join ();
ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
}
// First unsub
{
ack_ready = false;
std::thread unsub_thread ([config]() {
websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json", true, false);
});
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
unsub_thread.join ();
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
}
// Second unsub, should acknowledge but not decrease subscriber count
{
ack_ready = false;
std::thread unsub_thread ([config]() {
websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json", true, false);
});
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
unsub_thread.join ();
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
}
node1->stop ();
}
// Test client subscribing to changes in active_difficulty
@ -162,21 +62,22 @@ TEST (websocket, active_difficulty)
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::active_difficulty));
// Subscribe to active_difficulty and wait for response asynchronously
ack_ready = false;
auto client_task = ([config]() -> boost::optional<std::string> {
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "subscribe", "topic": "active_difficulty", "ack": true})json", true, true);
return response;
std::atomic<bool> ack_ready{ false };
auto task = ([&ack_ready, config, &node1]() {
fake_websocket_client client (config.websocket_config.port);
client.send_message (R"json({"action": "subscribe", "topic": "active_difficulty", "ack": true})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::active_difficulty));
return client.get_response ();
});
auto client_future = std::async (std::launch::async, client_task);
auto future = std::async (std::launch::async, task);
// Wait for acknowledge
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::active_difficulty));
// Fake history records to force trended_active_difficulty change
{
@ -184,15 +85,14 @@ TEST (websocket, active_difficulty)
node1->active.multipliers_cb.push_front (10.);
}
// Wait to receive the active_difficulty message
system.deadline_set (5s);
while (client_future.wait_for (std::chrono::seconds (0)) != std::future_status::ready)
while (future.wait_for (0s) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}
// Check active_difficulty response
auto response = client_future.get ();
boost::optional<std::string> response = future.get ();
ASSERT_TRUE (response);
std::stringstream stream;
stream << response;
@ -211,11 +111,9 @@ TEST (websocket, active_difficulty)
double multiplier = message_contents.get<double> ("multiplier");
ASSERT_NEAR (multiplier, nano::difficulty::to_multiplier (node1->active.active_difficulty (), node1->network_params.network.publish_threshold), 1e-6);
node1->stop ();
}
/** Subscribes to block confirmations, confirms a block and then awaits websocket notification */
// Subscribes to block confirmations, confirms a block and then awaits websocket notification
TEST (websocket, confirmation)
{
nano::system system;
@ -224,33 +122,33 @@ TEST (websocket, confirmation)
config.websocket_config.port = nano::get_available_port ();
auto node1 (system.add_node (config));
// Start websocket test-client in a separate thread
ack_ready = false;
std::atomic<bool> confirmation_event_received{ false };
ASSERT_FALSE (node1->websocket_server->any_subscriber (nano::websocket::topic::confirmation));
std::thread client_thread ([&confirmation_event_received, config]() {
// This will expect two results: the acknowledgement of the subscription
// and then the block confirmation message
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port),
R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true, true);
ASSERT_TRUE (response);
std::atomic<bool> ack_ready{ false };
std::atomic<bool> unsubscribed{ false };
auto task = ([&ack_ready, &unsubscribed, config, &node1]() {
fake_websocket_client client (config.websocket_config.port);
client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
auto response = client.get_response ();
EXPECT_TRUE (response);
boost::property_tree::ptree event;
std::stringstream stream;
stream << response.get ();
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "confirmation");
confirmation_event_received = true;
EXPECT_EQ (event.get<std::string> ("topic"), "confirmation");
client.send_message (R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json");
client.await_ack ();
unsubscribed = true;
EXPECT_FALSE (client.get_response (1s));
});
auto future = std::async (std::launch::async, task);
// Wait for the subscription to be acknowledged
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::confirmation));
nano::keypair key;
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
@ -264,39 +162,11 @@ TEST (websocket, confirmation)
node1->process_active (send);
}
// Wait for the confirmation to be received
system.deadline_set (5s);
while (!confirmation_event_received)
while (!unsubscribed)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
client_thread.join ();
std::atomic<bool> unsubscribe_ack_received{ false };
std::thread client_thread_2 ([&unsubscribe_ack_received, config]() {
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port),
R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true, true);
ASSERT_TRUE (response);
boost::property_tree::ptree event;
std::stringstream stream;
stream << response.get ();
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "confirmation");
// Unsubscribe action, expects an acknowledge but no response follows
websocket_test_call ("::1", std::to_string (config.websocket_config.port),
R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json", true, true, 1s);
unsubscribe_ack_received = true;
});
// Wait for the subscription to be acknowledged
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
// Quick confirm a state block
{
@ -306,19 +176,14 @@ TEST (websocket, confirmation)
node1->process_active (send);
}
// Wait for the unsubscribe action to be acknowledged
system.deadline_set (5s);
while (!unsubscribe_ack_received)
while (future.wait_for (0s) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
client_thread_2.join ();
node1->stop ();
}
/** Tests getting notification of an erased election */
// Tests getting notification of an erased election
TEST (websocket, stopped_election)
{
nano::system system;
@ -327,30 +192,22 @@ TEST (websocket, stopped_election)
config.websocket_config.port = nano::get_available_port ();
auto node1 (system.add_node (config));
// Start websocket test-client in a separate thread
ack_ready = false;
std::atomic<bool> client_thread_finished{ false };
ASSERT_FALSE (node1->websocket_server->any_subscriber (nano::websocket::topic::confirmation));
std::thread client_thread ([&client_thread_finished, config]() {
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port),
R"json({"action": "subscribe", "topic": "stopped_election", "ack": "true"})json", true, true, 5s);
ASSERT_TRUE (response);
boost::property_tree::ptree event;
std::stringstream stream;
stream << response.get ();
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "stopped_election");
client_thread_finished = true;
std::atomic<bool> ack_ready{ false };
auto task = ([&ack_ready, config, &node1]() {
fake_websocket_client client (config.websocket_config.port);
client.send_message (R"json({"action": "subscribe", "topic": "stopped_election", "ack": "true"})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::stopped_election));
return client.get_response ();
});
auto future = std::async (std::launch::async, task);
// Wait for subscribe acknowledgement
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
// Create election, then erase it, causing a websocket message to be emitted
nano::keypair key1;
@ -362,18 +219,22 @@ TEST (websocket, stopped_election)
node1->block_processor.flush ();
node1->active.erase (*send1);
// Wait for subscribe acknowledgement
system.deadline_set (5s);
while (!client_thread_finished)
while (future.wait_for (0s) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}
client_thread.join ();
node1->stop ();
auto response = future.get ();
ASSERT_TRUE (response);
boost::property_tree::ptree event;
std::stringstream stream;
stream << response.get ();
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "stopped_election");
}
/** Tests the filtering options of block confirmations */
// Tests the filtering options of block confirmations
TEST (websocket, confirmation_options)
{
nano::system system;
@ -382,26 +243,23 @@ TEST (websocket, confirmation_options)
config.websocket_config.port = nano::get_available_port ();
auto node1 (system.add_node (config));
// Start websocket test-client in a separate thread
ack_ready = false;
std::atomic<bool> client_thread_finished{ false };
ASSERT_FALSE (node1->websocket_server->any_subscriber (nano::websocket::topic::confirmation));
std::thread client_thread ([&client_thread_finished, config]() {
// Subscribe initially with a specific invalid account
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port),
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "accounts": ["xrb_invalid"]}})json", true, true, 1s);
ASSERT_FALSE (response);
client_thread_finished = true;
std::atomic<bool> ack_ready{ false };
auto task1 = ([&ack_ready, config, &node1]() {
fake_websocket_client client (config.websocket_config.port);
client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "accounts": ["xrb_invalid"]}})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
auto response = client.get_response (1s);
EXPECT_FALSE (response);
});
auto future1 = std::async (std::launch::async, task1);
// Wait for subscribe acknowledgement
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
// Confirm a state block for an in-wallet account
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
@ -416,24 +274,48 @@ TEST (websocket, confirmation_options)
previous = send->hash ();
}
// Wait for client thread to finish, no confirmation message should be received with given filter
system.deadline_set (5s);
while (!client_thread_finished)
while (future1.wait_for (0s) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
auto task2 = ([&ack_ready, config, &node1]() {
fake_websocket_client client (config.websocket_config.port);
client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "all_local_accounts": "true", "include_election_info": "true"}})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
return client.get_response ();
});
auto future2 = std::async (std::launch::async, task2);
std::atomic<bool> client_thread_2_finished{ false };
std::thread client_thread_2 ([&client_thread_2_finished, config]() {
// Re-subscribe with options for all local wallet accounts
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port),
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "all_local_accounts": "true", "include_election_info": "true"}})json", true, true);
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (response);
// Quick-confirm another block
{
balance -= send_amount;
auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, balance, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (previous)));
node1->process_active (send);
previous = send->hash ();
}
system.deadline_set (5s);
while (future2.wait_for (0s) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}
auto response2 = future2.get ();
ASSERT_TRUE (response2);
boost::property_tree::ptree event;
std::stringstream stream;
stream << response.get ();
stream << response2.get ();
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "confirmation");
try
@ -455,45 +337,23 @@ TEST (websocket, confirmation_options)
FAIL () << ex.what ();
}
client_thread_2_finished = true;
ack_ready = false;
auto task3 = ([&ack_ready, config, &node1]() {
fake_websocket_client client (config.websocket_config.port);
client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "all_local_accounts": "true"}})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
auto response = client.get_response (1s);
EXPECT_FALSE (response);
});
auto future3 = std::async (std::launch::async, task3);
node1->block_processor.flush ();
// Wait for the subscribe action to be acknowledged
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::confirmation));
// Quick-confirm another block
{
balance -= send_amount;
auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, balance, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (previous)));
node1->process_active (send);
previous = send->hash ();
}
node1->block_processor.flush ();
// Wait for confirmation message
system.deadline_set (5s);
while (!client_thread_2_finished)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
std::atomic<bool> client_thread_3_finished{ false };
std::thread client_thread_3 ([&client_thread_3_finished, config]() {
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port),
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "all_local_accounts": "true"}})json", true, true, 1s);
ASSERT_FALSE (response);
client_thread_3_finished = true;
});
// Confirm a legacy block
// When filtering options are enabled, legacy blocks are always filtered
@ -504,22 +364,14 @@ TEST (websocket, confirmation_options)
previous = send->hash ();
}
node1->block_processor.flush ();
// Wait for client thread to finish, no confirmation message should be received
system.deadline_set (5s);
while (!client_thread_3_finished)
while (future1.wait_for (0s) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
client_thread.join ();
client_thread_2.join ();
client_thread_3.join ();
node1->stop ();
}
/** Subscribes to votes, sends a block and awaits websocket notification of a vote arrival */
// Subscribes to votes, sends a block and awaits websocket notification of a vote arrival
TEST (websocket, vote)
{
nano::system system;
@ -528,34 +380,22 @@ TEST (websocket, vote)
config.websocket_config.port = nano::get_available_port ();
auto node1 (system.add_node (config));
// Start websocket test-client in a separate thread
ack_ready = false;
std::atomic<bool> client_thread_finished{ false };
ASSERT_FALSE (node1->websocket_server->any_subscriber (nano::websocket::topic::vote));
std::thread client_thread ([&client_thread_finished, config]() {
// This will expect two results: the acknowledgement of the subscription
// and then the vote message
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port),
R"json({"action": "subscribe", "topic": "vote", "ack": true})json", true, true);
ASSERT_TRUE (response);
boost::property_tree::ptree event;
std::stringstream stream;
stream << response;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "vote");
client_thread_finished = true;
std::atomic<bool> ack_ready{ false };
auto task = ([&ack_ready, config, &node1]() {
fake_websocket_client client (config.websocket_config.port);
client.send_message (R"json({"action": "subscribe", "topic": "vote", "ack": true})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::vote));
return client.get_response ();
});
auto future = std::async (std::launch::async, task);
// Wait for the subscription to be acknowledged
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::vote));
// Quick-confirm a block
nano::keypair key;
@ -564,18 +404,22 @@ TEST (websocket, vote)
auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, nano::genesis_amount - (node1->config.online_weight_minimum.number () + 1), key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (previous)));
node1->process_active (send);
// Wait for the websocket client to receive the vote message
system.deadline_set (5s);
while (!client_thread_finished)
while (future.wait_for (0s) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}
client_thread.join ();
node1->stop ();
auto response = future.get ();
ASSERT_TRUE (response);
boost::property_tree::ptree event;
std::stringstream stream;
stream << response;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "vote");
}
/** Tests vote subscription options - vote type */
// Tests vote subscription options - vote type
TEST (websocket, vote_options_type)
{
nano::system system;
@ -584,14 +428,37 @@ TEST (websocket, vote_options_type)
config.websocket_config.port = nano::get_available_port ();
auto node1 (system.add_node (config));
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::vote));
std::atomic<bool> ack_ready{ false };
auto task = ([&ack_ready, config, &node1]() {
fake_websocket_client client (config.websocket_config.port);
client.send_message (R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"include_replays": "true", "include_indeterminate": "false"}})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::vote));
return client.get_response ();
});
auto future = std::async (std::launch::async, task);
// Subscribe to votes and wait for response asynchronously
ack_ready = false;
std::atomic<bool> replay_received{ false };
std::thread client_thread ([&replay_received, config]() {
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port),
R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"include_replays": "true", "include_indeterminate": "false"}})json", true, true);
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
// Custom made votes for simplicity
nano::genesis genesis;
auto vote (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, genesis.open));
nano::websocket::message_builder builder;
auto msg (builder.vote_received (vote, nano::vote_code::replay));
node1->websocket_server->broadcast (msg);
system.deadline_set (5s);
while (future.wait_for (0s) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}
auto response = future.get ();
ASSERT_TRUE (response);
boost::property_tree::ptree event;
std::stringstream stream;
@ -600,46 +467,9 @@ TEST (websocket, vote_options_type)
auto message_contents = event.get_child ("message");
ASSERT_EQ (1, message_contents.count ("type"));
ASSERT_EQ ("replay", message_contents.get<std::string> ("type"));
replay_received = true;
});
// Wait for acknowledge
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::vote));
// Custom made votes for simplicity
nano::genesis genesis;
auto vote (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, genesis.open));
// Indeterminates are not included
{
nano::websocket::message_builder builder;
auto msg (builder.vote_received (vote, nano::vote_code::indeterminate));
node1->websocket_server->broadcast (msg);
}
// Replays are included
{
nano::websocket::message_builder builder;
auto msg (builder.vote_received (vote, nano::vote_code::replay));
node1->websocket_server->broadcast (msg);
}
// Wait for the websocket client
system.deadline_set (5s);
while (!replay_received)
{
ASSERT_NO_ERROR (system.poll ());
}
client_thread.join ();
node1->stop ();
}
/** Tests vote subscription options - list of representatives */
// Tests vote subscription options - list of representatives
TEST (websocket, vote_options_representatives)
{
nano::system system;
@ -648,35 +478,29 @@ TEST (websocket, vote_options_representatives)
config.websocket_config.port = nano::get_available_port ();
auto node1 (system.add_node (config));
// Start websocket test-client in a separate thread
ack_ready = false;
std::atomic<bool> client_thread_finished{ false };
ASSERT_FALSE (node1->websocket_server->any_subscriber (nano::websocket::topic::vote));
std::thread client_thread ([&client_thread_finished, config]() {
std::ostringstream data;
data << R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"representatives": [")json"
<< nano::test_genesis_key.pub.to_account ()
<< R"json("]}})json";
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), data.str (), true, true);
ASSERT_TRUE (response);
std::atomic<bool> ack_ready{ false };
auto task1 = ([&ack_ready, config, &node1]() {
fake_websocket_client client (config.websocket_config.port);
std::string message = boost::str (boost::format (R"json({"action": "subscribe", "topic": "vote", "ack": "true", "options": {"representatives": ["%1%"]}})json") % nano::test_genesis_key.pub.to_account ());
client.send_message (message);
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::vote));
auto response = client.get_response ();
EXPECT_TRUE (response);
boost::property_tree::ptree event;
std::stringstream stream;
stream << response;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "vote");
client_thread_finished = true;
EXPECT_EQ (event.get<std::string> ("topic"), "vote");
});
auto future1 = std::async (std::launch::async, task1);
// Wait for the subscription to be acknowledged
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::vote));
// Quick-confirm a block
nano::keypair key;
@ -691,22 +515,24 @@ TEST (websocket, vote_options_representatives)
};
confirm_block ();
// Wait for the websocket client to receive the vote message
system.deadline_set (5s);
while (!client_thread_finished || node1->websocket_server->any_subscriber (nano::websocket::topic::vote))
while (future1.wait_for (0s) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}
std::atomic<bool> client_thread_2_finished{ false };
std::thread client_thread_2 ([&client_thread_2_finished, config]() {
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port),
R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"representatives": ["xrb_invalid"]}})json", true, true);
ack_ready = false;
auto task2 = ([&ack_ready, config, &node1]() {
fake_websocket_client client (config.websocket_config.port);
client.send_message (R"json({"action": "subscribe", "topic": "vote", "ack": "true", "options": {"representatives": ["xrb_invalid"]}})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::vote));
auto response = client.get_response ();
// A list of invalid representatives is the same as no filter
ASSERT_TRUE (response);
client_thread_2_finished = true;
EXPECT_TRUE (response);
});
auto future2 = std::async (std::launch::async, task2);
// Wait for the subscription to be acknowledged
system.deadline_set (5s);
@ -714,22 +540,15 @@ TEST (websocket, vote_options_representatives)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;
ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::vote));
// Confirm another block
confirm_block ();
system.deadline_set (5s);
while (!client_thread_2_finished)
while (future2.wait_for (0s) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}
client_thread.join ();
client_thread_2.join ();
node1->stop ();
}
// Test client subscribing to notifications for work generation
@ -744,12 +563,16 @@ TEST (websocket, work)
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::work));
// Subscribe to work and wait for response asynchronously
ack_ready = false;
auto client_task = ([config]() -> boost::optional<std::string> {
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "subscribe", "topic": "work", "ack": true})json", true, true);
return response;
std::atomic<bool> ack_ready{ false };
auto task = ([&ack_ready, config, &node1]() {
fake_websocket_client client (config.websocket_config.port);
client.send_message (R"json({"action": "subscribe", "topic": "work", "ack": true})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::work));
return client.get_response ();
});
auto client_future = std::async (std::launch::async, client_task);
auto future = std::async (std::launch::async, task);
// Wait for acknowledge
system.deadline_set (5s);
@ -766,13 +589,13 @@ TEST (websocket, work)
// Wait for the work notification
system.deadline_set (5s);
while (client_future.wait_for (std::chrono::seconds (0)) != std::future_status::ready)
while (future.wait_for (0s) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}
// Check the work notification message
auto response = client_future.get ();
auto response = future.get ();
ASSERT_TRUE (response);
std::stringstream stream;
stream << response;
@ -817,12 +640,16 @@ TEST (websocket, bootstrap)
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));
// Subscribe to bootstrap and wait for response asynchronously
ack_ready = false;
auto client_task = ([config]() -> boost::optional<std::string> {
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "subscribe", "topic": "bootstrap", "ack": true})json", true, true);
return response;
std::atomic<bool> ack_ready{ false };
auto task = ([&ack_ready, config, &node1]() {
fake_websocket_client client (config.websocket_config.port);
client.send_message (R"json({"action": "subscribe", "topic": "bootstrap", "ack": true})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));
return client.get_response ();
});
auto client_future = std::async (std::launch::async, client_task);
auto future = std::async (std::launch::async, task);
// Wait for acknowledge
system.deadline_set (5s);
@ -830,7 +657,6 @@ TEST (websocket, bootstrap)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));
// Start bootstrap attempt
node1->bootstrap_initiator.bootstrap (true, "123abc");
@ -838,13 +664,13 @@ TEST (websocket, bootstrap)
// Wait for the bootstrap notification
system.deadline_set (5s);
while (client_future.wait_for (std::chrono::seconds (0)) != std::future_status::ready)
while (future.wait_for (0s) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}
// Check the bootstrap notification message
auto response = client_future.get ();
auto response = future.get ();
ASSERT_TRUE (response);
std::stringstream stream;
stream << response;
@ -873,17 +699,15 @@ TEST (websocket, bootstrap_exited)
config.websocket_config.port = nano::get_available_port ();
auto node1 (system.add_node (config));
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));
// Start bootstrap, exit after subscription
std::atomic<bool> bootstrap_started{ false };
nano::util::counted_completion subscribed_completion (1);
std::thread bootstrap_thread ([node1, &bootstrap_started, &subscribed_completion]() {
node1->bootstrap_initiator.bootstrap (true, "123abc");
auto attempt (node1->bootstrap_initiator.current_attempt ());
ASSERT_NE (nullptr, attempt);
EXPECT_NE (nullptr, attempt);
bootstrap_started = true;
ASSERT_FALSE (subscribed_completion.await_count_for (5s));
EXPECT_FALSE (subscribed_completion.await_count_for (5s));
});
// Wait for bootstrap start
@ -894,12 +718,16 @@ TEST (websocket, bootstrap_exited)
}
// Subscribe to bootstrap and wait for response asynchronously
ack_ready = false;
auto client_task = ([config]() -> boost::optional<std::string> {
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "subscribe", "topic": "bootstrap", "ack": true})json", true, true);
return response;
std::atomic<bool> ack_ready{ false };
auto task = ([&ack_ready, config, &node1]() {
fake_websocket_client client (config.websocket_config.port);
client.send_message (R"json({"action": "subscribe", "topic": "bootstrap", "ack": true})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));
return client.get_response ();
});
auto client_future = std::async (std::launch::async, client_task);
auto future = std::async (std::launch::async, task);
// Wait for acknowledge
system.deadline_set (5s);
@ -907,19 +735,18 @@ TEST (websocket, bootstrap_exited)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));
// Wait for the bootstrap notification
subscribed_completion.increment ();
bootstrap_thread.join ();
system.deadline_set (5s);
while (client_future.wait_for (std::chrono::seconds (0)) != std::future_status::ready)
while (future.wait_for (0s) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}
// Check the bootstrap notification message
auto response = client_future.get ();
auto response = future.get ();
ASSERT_TRUE (response);
std::stringstream stream;
stream << response;
@ -935,7 +762,7 @@ TEST (websocket, bootstrap_exited)
ASSERT_LT (contents.get<unsigned> ("duration"), 15000);
}
/** Tests clients subscribing multiple times or unsubscribing without a subscription */
// Tests sending keepalive
TEST (websocket, ws_keepalive)
{
nano::system system;
@ -944,14 +771,16 @@ TEST (websocket, ws_keepalive)
config.websocket_config.port = nano::get_available_port ();
auto node1 (system.add_node (config));
ack_ready = false;
std::thread subscription_thread ([config]() {
websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "ping"})json", true, false);
auto task = ([config]() {
fake_websocket_client client (config.websocket_config.port);
client.send_message (R"json({"action": "ping"})json");
client.await_ack ();
});
auto future = std::async (std::launch::async, task);
system.deadline_set (5s);
while (!ack_ready)
while (future.wait_for (0s) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}
subscription_thread.join ();
}

View file

@ -1,9 +1,11 @@
#pragma once
#include <nano/boost/asio/strand.hpp>
#include <nano/boost/beast/core.hpp>
#include <nano/boost/beast/websocket.hpp>
#include <nano/lib/blocks.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/secure/common.hpp>
#include <boost/property_tree/json_parser.hpp>