diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index 3d8418e2e..314c5effa 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -1,6 +1,7 @@ add_executable (core_test core_test_main.cc testutil.hpp + fakes/websocket_client.hpp fakes/work_peer.hpp active_transactions.cpp block.cpp diff --git a/nano/core_test/fakes/websocket_client.hpp b/nano/core_test/fakes/websocket_client.hpp new file mode 100644 index 000000000..e0d9664ad --- /dev/null +++ b/nano/core_test/fakes/websocket_client.hpp @@ -0,0 +1,74 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include + +using namespace std::chrono_literals; + +namespace +{ +// Creates its own io context +class fake_websocket_client +{ +public: + fake_websocket_client (unsigned port) : + socket (std::make_shared> (ioc)) + { + std::string const host = "::1"; + boost::asio::ip::tcp::resolver resolver{ ioc }; + auto const results = resolver.resolve (host, std::to_string (port)); + boost::asio::connect (socket->next_layer (), results.begin (), results.end ()); + socket->handshake (host, "/"); + socket->text (true); + } + + ~fake_websocket_client () + { + if (socket->is_open ()) + { + socket->async_close (boost::beast::websocket::close_code::normal, [socket = this->socket](boost::beast::error_code const & ec) { + // A synchronous close usually hangs in tests when the server's io_context stops looping + // An async_close solves this problem + }); + } + } + + void send_message (std::string const & message_a) + { + socket->write (boost::asio::buffer (message_a)); + } + + void await_ack () + { + assert (socket->is_open ()); + boost::beast::flat_buffer buffer; + socket->read (buffer); + } + + boost::optional get_response (std::chrono::seconds const deadline = 5s) + { + assert (deadline > 0s); + boost::optional result; + auto buffer (std::make_shared ()); + socket->async_read (*buffer, [&result, &buffer, socket = this->socket](boost::beast::error_code const & ec, std::size_t const /*n*/) { + if (!ec) + { + std::ostringstream res; + res << beast_buffers (buffer->data ()); + result = res.str (); + } + }); + ioc.run_one_for (deadline); + return result; + } + +private: + boost::asio::io_context ioc; + std::shared_ptr> socket; +}; +} diff --git a/nano/core_test/websocket.cpp b/nano/core_test/websocket.cpp index 0630af8b6..66c242eeb 100644 --- a/nano/core_test/websocket.cpp +++ b/nano/core_test/websocket.cpp @@ -1,7 +1,4 @@ -#include -#include -#include -#include +#include #include #include #include @@ -19,65 +16,7 @@ using namespace std::chrono_literals; -namespace -{ -/** This variable must be set to false before setting up every thread that makes a websocket test call (and needs ack), to be safe */ -std::atomic ack_ready{ false }; - -/** An optionally blocking websocket client for testing */ -boost::optional websocket_test_call (std::string host, std::string port, std::string message_a, bool await_ack, bool await_response, const std::chrono::seconds response_deadline = 5s) -{ - if (await_ack) - { - ack_ready = false; - } - - boost::optional ret; - boost::asio::io_context ioc; - boost::asio::ip::tcp::resolver resolver{ ioc }; - auto ws (std::make_shared> (ioc)); - - auto const results = resolver.resolve (host, port); - boost::asio::connect (ws->next_layer (), results.begin (), results.end ()); - - ws->handshake (host, "/"); - ws->text (true); - ws->write (boost::asio::buffer (message_a)); - - if (await_ack) - { - boost::beast::flat_buffer buffer; - ws->read (buffer); - ack_ready = true; - } - - if (await_response) - { - assert (response_deadline > 0s); - auto buffer (std::make_shared ()); - ws->async_read (*buffer, [&ret, ws, buffer](boost::beast::error_code const & ec, std::size_t const n) { - if (!ec) - { - std::ostringstream res; - res << beast_buffers (buffer->data ()); - ret = res.str (); - } - }); - ioc.run_one_for (response_deadline); - } - - if (ws->is_open ()) - { - ws->async_close (boost::beast::websocket::close_code::normal, [ws](boost::beast::error_code const & ec) { - // A synchronous close usually hangs in tests when the server's io_context stops looping - // An async_close solves this problem - }); - } - return ret; -} -} - -/** Tests clients subscribing multiple times or unsubscribing without a subscription */ +// Tests clients subscribing multiple times or unsubscribing without a subscription TEST (websocket, subscription_edge) { nano::system system; @@ -88,67 +27,28 @@ TEST (websocket, subscription_edge) ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); - // First subscription - { - ack_ready = false; - std::thread subscription_thread ([config]() { - websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true, false); - }); - system.deadline_set (5s); - while (!ack_ready) - { - ASSERT_NO_ERROR (system.poll ()); - } - subscription_thread.join (); - ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); - } + auto task = ([config, &node1]() { + fake_websocket_client client (config.websocket_config.port); + client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json"); + client.await_ack (); + EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json"); + client.await_ack (); + EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + client.send_message (R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json"); + client.await_ack (); + EXPECT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + client.send_message (R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json"); + client.await_ack (); + EXPECT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + }); + auto future = std::async (std::launch::async, task); - // Second subscription, should not increase subscriber count, only update the subscription + system.deadline_set (5s); + while (future.wait_for (0s) != std::future_status::ready) { - ack_ready = false; - std::thread subscription_thread ([config]() { - websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true, false); - }); - system.deadline_set (5s); - while (!ack_ready) - { - ASSERT_NO_ERROR (system.poll ()); - } - subscription_thread.join (); - ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + ASSERT_NO_ERROR (system.poll ()); } - - // First unsub - { - ack_ready = false; - std::thread unsub_thread ([config]() { - websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json", true, false); - }); - system.deadline_set (5s); - while (!ack_ready) - { - ASSERT_NO_ERROR (system.poll ()); - } - unsub_thread.join (); - ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); - } - - // Second unsub, should acknowledge but not decrease subscriber count - { - ack_ready = false; - std::thread unsub_thread ([config]() { - websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json", true, false); - }); - system.deadline_set (5s); - while (!ack_ready) - { - ASSERT_NO_ERROR (system.poll ()); - } - unsub_thread.join (); - ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); - } - - node1->stop (); } // Test client subscribing to changes in active_difficulty @@ -162,21 +62,22 @@ TEST (websocket, active_difficulty) ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::active_difficulty)); - // Subscribe to active_difficulty and wait for response asynchronously - ack_ready = false; - auto client_task = ([config]() -> boost::optional { - auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "subscribe", "topic": "active_difficulty", "ack": true})json", true, true); - return response; + std::atomic ack_ready{ false }; + auto task = ([&ack_ready, config, &node1]() { + fake_websocket_client client (config.websocket_config.port); + client.send_message (R"json({"action": "subscribe", "topic": "active_difficulty", "ack": true})json"); + client.await_ack (); + ack_ready = true; + EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::active_difficulty)); + return client.get_response (); }); - auto client_future = std::async (std::launch::async, client_task); + auto future = std::async (std::launch::async, task); - // Wait for acknowledge system.deadline_set (5s); while (!ack_ready) { ASSERT_NO_ERROR (system.poll ()); } - ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::active_difficulty)); // Fake history records to force trended_active_difficulty change { @@ -184,15 +85,14 @@ TEST (websocket, active_difficulty) node1->active.multipliers_cb.push_front (10.); } - // Wait to receive the active_difficulty message system.deadline_set (5s); - while (client_future.wait_for (std::chrono::seconds (0)) != std::future_status::ready) + while (future.wait_for (0s) != std::future_status::ready) { ASSERT_NO_ERROR (system.poll ()); } // Check active_difficulty response - auto response = client_future.get (); + boost::optional response = future.get (); ASSERT_TRUE (response); std::stringstream stream; stream << response; @@ -211,11 +111,9 @@ TEST (websocket, active_difficulty) double multiplier = message_contents.get ("multiplier"); ASSERT_NEAR (multiplier, nano::difficulty::to_multiplier (node1->active.active_difficulty (), node1->network_params.network.publish_threshold), 1e-6); - - node1->stop (); } -/** Subscribes to block confirmations, confirms a block and then awaits websocket notification */ +// Subscribes to block confirmations, confirms a block and then awaits websocket notification TEST (websocket, confirmation) { nano::system system; @@ -224,33 +122,33 @@ TEST (websocket, confirmation) config.websocket_config.port = nano::get_available_port (); auto node1 (system.add_node (config)); - // Start websocket test-client in a separate thread - ack_ready = false; - std::atomic confirmation_event_received{ false }; - ASSERT_FALSE (node1->websocket_server->any_subscriber (nano::websocket::topic::confirmation)); - std::thread client_thread ([&confirmation_event_received, config]() { - // This will expect two results: the acknowledgement of the subscription - // and then the block confirmation message - auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), - R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true, true); - ASSERT_TRUE (response); + std::atomic ack_ready{ false }; + std::atomic unsubscribed{ false }; + auto task = ([&ack_ready, &unsubscribed, config, &node1]() { + fake_websocket_client client (config.websocket_config.port); + client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json"); + client.await_ack (); + ack_ready = true; + EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + auto response = client.get_response (); + EXPECT_TRUE (response); boost::property_tree::ptree event; std::stringstream stream; stream << response.get (); boost::property_tree::read_json (stream, event); - ASSERT_EQ (event.get ("topic"), "confirmation"); - confirmation_event_received = true; + EXPECT_EQ (event.get ("topic"), "confirmation"); + client.send_message (R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json"); + client.await_ack (); + unsubscribed = true; + EXPECT_FALSE (client.get_response (1s)); }); + auto future = std::async (std::launch::async, task); - // Wait for the subscription to be acknowledged system.deadline_set (5s); while (!ack_ready) { ASSERT_NO_ERROR (system.poll ()); } - ack_ready = false; - - ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::confirmation)); nano::keypair key; system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); @@ -264,39 +162,11 @@ TEST (websocket, confirmation) node1->process_active (send); } - // Wait for the confirmation to be received system.deadline_set (5s); - while (!confirmation_event_received) + while (!unsubscribed) { ASSERT_NO_ERROR (system.poll ()); } - ack_ready = false; - client_thread.join (); - - std::atomic unsubscribe_ack_received{ false }; - std::thread client_thread_2 ([&unsubscribe_ack_received, config]() { - auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), - R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true, true); - ASSERT_TRUE (response); - boost::property_tree::ptree event; - std::stringstream stream; - stream << response.get (); - boost::property_tree::read_json (stream, event); - ASSERT_EQ (event.get ("topic"), "confirmation"); - - // Unsubscribe action, expects an acknowledge but no response follows - websocket_test_call ("::1", std::to_string (config.websocket_config.port), - R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json", true, true, 1s); - unsubscribe_ack_received = true; - }); - - // Wait for the subscription to be acknowledged - system.deadline_set (5s); - while (!ack_ready) - { - ASSERT_NO_ERROR (system.poll ()); - } - ack_ready = false; // Quick confirm a state block { @@ -306,19 +176,14 @@ TEST (websocket, confirmation) node1->process_active (send); } - // Wait for the unsubscribe action to be acknowledged system.deadline_set (5s); - while (!unsubscribe_ack_received) + while (future.wait_for (0s) != std::future_status::ready) { ASSERT_NO_ERROR (system.poll ()); } - ack_ready = false; - client_thread_2.join (); - - node1->stop (); } -/** Tests getting notification of an erased election */ +// Tests getting notification of an erased election TEST (websocket, stopped_election) { nano::system system; @@ -327,30 +192,22 @@ TEST (websocket, stopped_election) config.websocket_config.port = nano::get_available_port (); auto node1 (system.add_node (config)); - // Start websocket test-client in a separate thread - ack_ready = false; - std::atomic client_thread_finished{ false }; - ASSERT_FALSE (node1->websocket_server->any_subscriber (nano::websocket::topic::confirmation)); - std::thread client_thread ([&client_thread_finished, config]() { - auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), - R"json({"action": "subscribe", "topic": "stopped_election", "ack": "true"})json", true, true, 5s); - - ASSERT_TRUE (response); - boost::property_tree::ptree event; - std::stringstream stream; - stream << response.get (); - boost::property_tree::read_json (stream, event); - ASSERT_EQ (event.get ("topic"), "stopped_election"); - client_thread_finished = true; + std::atomic ack_ready{ false }; + auto task = ([&ack_ready, config, &node1]() { + fake_websocket_client client (config.websocket_config.port); + client.send_message (R"json({"action": "subscribe", "topic": "stopped_election", "ack": "true"})json"); + client.await_ack (); + ack_ready = true; + EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::stopped_election)); + return client.get_response (); }); + auto future = std::async (std::launch::async, task); - // Wait for subscribe acknowledgement system.deadline_set (5s); while (!ack_ready) { ASSERT_NO_ERROR (system.poll ()); } - ack_ready = false; // Create election, then erase it, causing a websocket message to be emitted nano::keypair key1; @@ -362,18 +219,22 @@ TEST (websocket, stopped_election) node1->block_processor.flush (); node1->active.erase (*send1); - // Wait for subscribe acknowledgement system.deadline_set (5s); - while (!client_thread_finished) + while (future.wait_for (0s) != std::future_status::ready) { ASSERT_NO_ERROR (system.poll ()); } - client_thread.join (); - node1->stop (); + auto response = future.get (); + ASSERT_TRUE (response); + boost::property_tree::ptree event; + std::stringstream stream; + stream << response.get (); + boost::property_tree::read_json (stream, event); + ASSERT_EQ (event.get ("topic"), "stopped_election"); } -/** Tests the filtering options of block confirmations */ +// Tests the filtering options of block confirmations TEST (websocket, confirmation_options) { nano::system system; @@ -382,26 +243,23 @@ TEST (websocket, confirmation_options) config.websocket_config.port = nano::get_available_port (); auto node1 (system.add_node (config)); - // Start websocket test-client in a separate thread - ack_ready = false; - std::atomic client_thread_finished{ false }; - ASSERT_FALSE (node1->websocket_server->any_subscriber (nano::websocket::topic::confirmation)); - std::thread client_thread ([&client_thread_finished, config]() { - // Subscribe initially with a specific invalid account - auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), - R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "accounts": ["xrb_invalid"]}})json", true, true, 1s); - - ASSERT_FALSE (response); - client_thread_finished = true; + std::atomic ack_ready{ false }; + auto task1 = ([&ack_ready, config, &node1]() { + fake_websocket_client client (config.websocket_config.port); + client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "accounts": ["xrb_invalid"]}})json"); + client.await_ack (); + ack_ready = true; + EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + auto response = client.get_response (1s); + EXPECT_FALSE (response); }); + auto future1 = std::async (std::launch::async, task1); - // Wait for subscribe acknowledgement system.deadline_set (5s); while (!ack_ready) { ASSERT_NO_ERROR (system.poll ()); } - ack_ready = false; // Confirm a state block for an in-wallet account system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); @@ -416,58 +274,28 @@ TEST (websocket, confirmation_options) previous = send->hash (); } - // Wait for client thread to finish, no confirmation message should be received with given filter system.deadline_set (5s); - while (!client_thread_finished) + while (future1.wait_for (0s) != std::future_status::ready) { ASSERT_NO_ERROR (system.poll ()); } + ack_ready = false; - - std::atomic client_thread_2_finished{ false }; - std::thread client_thread_2 ([&client_thread_2_finished, config]() { - // Re-subscribe with options for all local wallet accounts - auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), - R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "all_local_accounts": "true", "include_election_info": "true"}})json", true, true); - - ASSERT_TRUE (response); - boost::property_tree::ptree event; - std::stringstream stream; - stream << response.get (); - boost::property_tree::read_json (stream, event); - ASSERT_EQ (event.get ("topic"), "confirmation"); - try - { - boost::property_tree::ptree election_info = event.get_child ("message.election_info"); - auto tally (election_info.get ("tally")); - auto time (election_info.get ("time")); - // Duration and request count may be zero on testnet, so we only check that they're present - ASSERT_EQ (1, election_info.count ("duration")); - ASSERT_EQ (1, election_info.count ("request_count")); - ASSERT_EQ (1, election_info.count ("voters")); - ASSERT_GE (1, election_info.get ("blocks")); - // Make sure tally and time are non-zero. - ASSERT_NE ("0", tally); - ASSERT_NE ("0", time); - } - catch (std::runtime_error const & ex) - { - FAIL () << ex.what (); - } - - client_thread_2_finished = true; + auto task2 = ([&ack_ready, config, &node1]() { + fake_websocket_client client (config.websocket_config.port); + client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "all_local_accounts": "true", "include_election_info": "true"}})json"); + client.await_ack (); + ack_ready = true; + EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + return client.get_response (); }); + auto future2 = std::async (std::launch::async, task2); - node1->block_processor.flush (); - // Wait for the subscribe action to be acknowledged system.deadline_set (5s); while (!ack_ready) { ASSERT_NO_ERROR (system.poll ()); } - ack_ready = false; - - ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::confirmation)); // Quick-confirm another block { @@ -477,23 +305,55 @@ TEST (websocket, confirmation_options) previous = send->hash (); } - node1->block_processor.flush (); - // Wait for confirmation message system.deadline_set (5s); - while (!client_thread_2_finished) + while (future2.wait_for (0s) != std::future_status::ready) { ASSERT_NO_ERROR (system.poll ()); } + + auto response2 = future2.get (); + ASSERT_TRUE (response2); + boost::property_tree::ptree event; + std::stringstream stream; + stream << response2.get (); + boost::property_tree::read_json (stream, event); + ASSERT_EQ (event.get ("topic"), "confirmation"); + try + { + boost::property_tree::ptree election_info = event.get_child ("message.election_info"); + auto tally (election_info.get ("tally")); + auto time (election_info.get ("time")); + // Duration and request count may be zero on testnet, so we only check that they're present + ASSERT_EQ (1, election_info.count ("duration")); + ASSERT_EQ (1, election_info.count ("request_count")); + ASSERT_EQ (1, election_info.count ("voters")); + ASSERT_GE (1, election_info.get ("blocks")); + // Make sure tally and time are non-zero. + ASSERT_NE ("0", tally); + ASSERT_NE ("0", time); + } + catch (std::runtime_error const & ex) + { + FAIL () << ex.what (); + } + ack_ready = false; - - std::atomic client_thread_3_finished{ false }; - std::thread client_thread_3 ([&client_thread_3_finished, config]() { - auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), - R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "all_local_accounts": "true"}})json", true, true, 1s); - - ASSERT_FALSE (response); - client_thread_3_finished = true; + auto task3 = ([&ack_ready, config, &node1]() { + fake_websocket_client client (config.websocket_config.port); + client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "all_local_accounts": "true"}})json"); + client.await_ack (); + ack_ready = true; + EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + auto response = client.get_response (1s); + EXPECT_FALSE (response); }); + auto future3 = std::async (std::launch::async, task3); + + system.deadline_set (5s); + while (!ack_ready) + { + ASSERT_NO_ERROR (system.poll ()); + } // Confirm a legacy block // When filtering options are enabled, legacy blocks are always filtered @@ -504,22 +364,14 @@ TEST (websocket, confirmation_options) previous = send->hash (); } - node1->block_processor.flush (); - // Wait for client thread to finish, no confirmation message should be received system.deadline_set (5s); - while (!client_thread_3_finished) + while (future1.wait_for (0s) != std::future_status::ready) { ASSERT_NO_ERROR (system.poll ()); } - ack_ready = false; - - client_thread.join (); - client_thread_2.join (); - client_thread_3.join (); - node1->stop (); } -/** Subscribes to votes, sends a block and awaits websocket notification of a vote arrival */ +// Subscribes to votes, sends a block and awaits websocket notification of a vote arrival TEST (websocket, vote) { nano::system system; @@ -528,34 +380,22 @@ TEST (websocket, vote) config.websocket_config.port = nano::get_available_port (); auto node1 (system.add_node (config)); - // Start websocket test-client in a separate thread - ack_ready = false; - std::atomic client_thread_finished{ false }; - ASSERT_FALSE (node1->websocket_server->any_subscriber (nano::websocket::topic::vote)); - std::thread client_thread ([&client_thread_finished, config]() { - // This will expect two results: the acknowledgement of the subscription - // and then the vote message - auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), - R"json({"action": "subscribe", "topic": "vote", "ack": true})json", true, true); - - ASSERT_TRUE (response); - boost::property_tree::ptree event; - std::stringstream stream; - stream << response; - boost::property_tree::read_json (stream, event); - ASSERT_EQ (event.get ("topic"), "vote"); - client_thread_finished = true; + std::atomic ack_ready{ false }; + auto task = ([&ack_ready, config, &node1]() { + fake_websocket_client client (config.websocket_config.port); + client.send_message (R"json({"action": "subscribe", "topic": "vote", "ack": true})json"); + client.await_ack (); + ack_ready = true; + EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::vote)); + return client.get_response (); }); + auto future = std::async (std::launch::async, task); - // Wait for the subscription to be acknowledged system.deadline_set (5s); while (!ack_ready) { ASSERT_NO_ERROR (system.poll ()); } - ack_ready = false; - - ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::vote)); // Quick-confirm a block nano::keypair key; @@ -564,18 +404,22 @@ TEST (websocket, vote) auto send (std::make_shared (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, nano::genesis_amount - (node1->config.online_weight_minimum.number () + 1), key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (previous))); node1->process_active (send); - // Wait for the websocket client to receive the vote message system.deadline_set (5s); - while (!client_thread_finished) + while (future.wait_for (0s) != std::future_status::ready) { ASSERT_NO_ERROR (system.poll ()); } - client_thread.join (); - node1->stop (); + auto response = future.get (); + ASSERT_TRUE (response); + boost::property_tree::ptree event; + std::stringstream stream; + stream << response; + boost::property_tree::read_json (stream, event); + ASSERT_EQ (event.get ("topic"), "vote"); } -/** Tests vote subscription options - vote type */ +// Tests vote subscription options - vote type TEST (websocket, vote_options_type) { nano::system system; @@ -584,62 +428,48 @@ TEST (websocket, vote_options_type) config.websocket_config.port = nano::get_available_port (); auto node1 (system.add_node (config)); - ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::vote)); - - // Subscribe to votes and wait for response asynchronously - ack_ready = false; - std::atomic replay_received{ false }; - std::thread client_thread ([&replay_received, config]() { - auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), - R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"include_replays": "true", "include_indeterminate": "false"}})json", true, true); - ASSERT_TRUE (response); - boost::property_tree::ptree event; - std::stringstream stream; - stream << response; - boost::property_tree::read_json (stream, event); - auto message_contents = event.get_child ("message"); - ASSERT_EQ (1, message_contents.count ("type")); - ASSERT_EQ ("replay", message_contents.get ("type")); - replay_received = true; + std::atomic ack_ready{ false }; + auto task = ([&ack_ready, config, &node1]() { + fake_websocket_client client (config.websocket_config.port); + client.send_message (R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"include_replays": "true", "include_indeterminate": "false"}})json"); + client.await_ack (); + ack_ready = true; + EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::vote)); + return client.get_response (); }); + auto future = std::async (std::launch::async, task); - // Wait for acknowledge system.deadline_set (5s); while (!ack_ready) { ASSERT_NO_ERROR (system.poll ()); } - ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::vote)); // Custom made votes for simplicity nano::genesis genesis; auto vote (std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, genesis.open)); + nano::websocket::message_builder builder; + auto msg (builder.vote_received (vote, nano::vote_code::replay)); + node1->websocket_server->broadcast (msg); - // Indeterminates are not included - { - nano::websocket::message_builder builder; - auto msg (builder.vote_received (vote, nano::vote_code::indeterminate)); - node1->websocket_server->broadcast (msg); - } - - // Replays are included - { - nano::websocket::message_builder builder; - auto msg (builder.vote_received (vote, nano::vote_code::replay)); - node1->websocket_server->broadcast (msg); - } - - // Wait for the websocket client system.deadline_set (5s); - while (!replay_received) + while (future.wait_for (0s) != std::future_status::ready) { ASSERT_NO_ERROR (system.poll ()); } - client_thread.join (); - node1->stop (); + + auto response = future.get (); + ASSERT_TRUE (response); + boost::property_tree::ptree event; + std::stringstream stream; + stream << response; + boost::property_tree::read_json (stream, event); + auto message_contents = event.get_child ("message"); + ASSERT_EQ (1, message_contents.count ("type")); + ASSERT_EQ ("replay", message_contents.get ("type")); } -/** Tests vote subscription options - list of representatives */ +// Tests vote subscription options - list of representatives TEST (websocket, vote_options_representatives) { nano::system system; @@ -648,35 +478,29 @@ TEST (websocket, vote_options_representatives) config.websocket_config.port = nano::get_available_port (); auto node1 (system.add_node (config)); - // Start websocket test-client in a separate thread - ack_ready = false; - std::atomic client_thread_finished{ false }; - ASSERT_FALSE (node1->websocket_server->any_subscriber (nano::websocket::topic::vote)); - std::thread client_thread ([&client_thread_finished, config]() { - std::ostringstream data; - data << R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"representatives": [")json" - << nano::test_genesis_key.pub.to_account () - << R"json("]}})json"; - auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), data.str (), true, true); - - ASSERT_TRUE (response); + std::atomic ack_ready{ false }; + auto task1 = ([&ack_ready, config, &node1]() { + fake_websocket_client client (config.websocket_config.port); + std::string message = boost::str (boost::format (R"json({"action": "subscribe", "topic": "vote", "ack": "true", "options": {"representatives": ["%1%"]}})json") % nano::test_genesis_key.pub.to_account ()); + client.send_message (message); + client.await_ack (); + ack_ready = true; + EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::vote)); + auto response = client.get_response (); + EXPECT_TRUE (response); boost::property_tree::ptree event; std::stringstream stream; stream << response; boost::property_tree::read_json (stream, event); - ASSERT_EQ (event.get ("topic"), "vote"); - client_thread_finished = true; + EXPECT_EQ (event.get ("topic"), "vote"); }); + auto future1 = std::async (std::launch::async, task1); - // Wait for the subscription to be acknowledged system.deadline_set (5s); while (!ack_ready) { ASSERT_NO_ERROR (system.poll ()); } - ack_ready = false; - - ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::vote)); // Quick-confirm a block nano::keypair key; @@ -691,22 +515,24 @@ TEST (websocket, vote_options_representatives) }; confirm_block (); - // Wait for the websocket client to receive the vote message system.deadline_set (5s); - while (!client_thread_finished || node1->websocket_server->any_subscriber (nano::websocket::topic::vote)) + while (future1.wait_for (0s) != std::future_status::ready) { ASSERT_NO_ERROR (system.poll ()); } - std::atomic client_thread_2_finished{ false }; - std::thread client_thread_2 ([&client_thread_2_finished, config]() { - auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), - R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"representatives": ["xrb_invalid"]}})json", true, true); - + ack_ready = false; + auto task2 = ([&ack_ready, config, &node1]() { + fake_websocket_client client (config.websocket_config.port); + client.send_message (R"json({"action": "subscribe", "topic": "vote", "ack": "true", "options": {"representatives": ["xrb_invalid"]}})json"); + client.await_ack (); + ack_ready = true; + EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::vote)); + auto response = client.get_response (); // A list of invalid representatives is the same as no filter - ASSERT_TRUE (response); - client_thread_2_finished = true; + EXPECT_TRUE (response); }); + auto future2 = std::async (std::launch::async, task2); // Wait for the subscription to be acknowledged system.deadline_set (5s); @@ -714,22 +540,15 @@ TEST (websocket, vote_options_representatives) { ASSERT_NO_ERROR (system.poll ()); } - ack_ready = false; - - ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::vote)); // Confirm another block confirm_block (); system.deadline_set (5s); - while (!client_thread_2_finished) + while (future2.wait_for (0s) != std::future_status::ready) { ASSERT_NO_ERROR (system.poll ()); } - - client_thread.join (); - client_thread_2.join (); - node1->stop (); } // Test client subscribing to notifications for work generation @@ -744,12 +563,16 @@ TEST (websocket, work) ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::work)); // Subscribe to work and wait for response asynchronously - ack_ready = false; - auto client_task = ([config]() -> boost::optional { - auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "subscribe", "topic": "work", "ack": true})json", true, true); - return response; + std::atomic ack_ready{ false }; + auto task = ([&ack_ready, config, &node1]() { + fake_websocket_client client (config.websocket_config.port); + client.send_message (R"json({"action": "subscribe", "topic": "work", "ack": true})json"); + client.await_ack (); + ack_ready = true; + EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::work)); + return client.get_response (); }); - auto client_future = std::async (std::launch::async, client_task); + auto future = std::async (std::launch::async, task); // Wait for acknowledge system.deadline_set (5s); @@ -766,13 +589,13 @@ TEST (websocket, work) // Wait for the work notification system.deadline_set (5s); - while (client_future.wait_for (std::chrono::seconds (0)) != std::future_status::ready) + while (future.wait_for (0s) != std::future_status::ready) { ASSERT_NO_ERROR (system.poll ()); } // Check the work notification message - auto response = client_future.get (); + auto response = future.get (); ASSERT_TRUE (response); std::stringstream stream; stream << response; @@ -817,12 +640,16 @@ TEST (websocket, bootstrap) ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap)); // Subscribe to bootstrap and wait for response asynchronously - ack_ready = false; - auto client_task = ([config]() -> boost::optional { - auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "subscribe", "topic": "bootstrap", "ack": true})json", true, true); - return response; + std::atomic ack_ready{ false }; + auto task = ([&ack_ready, config, &node1]() { + fake_websocket_client client (config.websocket_config.port); + client.send_message (R"json({"action": "subscribe", "topic": "bootstrap", "ack": true})json"); + client.await_ack (); + ack_ready = true; + EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap)); + return client.get_response (); }); - auto client_future = std::async (std::launch::async, client_task); + auto future = std::async (std::launch::async, task); // Wait for acknowledge system.deadline_set (5s); @@ -830,7 +657,6 @@ TEST (websocket, bootstrap) { ASSERT_NO_ERROR (system.poll ()); } - ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap)); // Start bootstrap attempt node1->bootstrap_initiator.bootstrap (true, "123abc"); @@ -838,13 +664,13 @@ TEST (websocket, bootstrap) // Wait for the bootstrap notification system.deadline_set (5s); - while (client_future.wait_for (std::chrono::seconds (0)) != std::future_status::ready) + while (future.wait_for (0s) != std::future_status::ready) { ASSERT_NO_ERROR (system.poll ()); } // Check the bootstrap notification message - auto response = client_future.get (); + auto response = future.get (); ASSERT_TRUE (response); std::stringstream stream; stream << response; @@ -873,17 +699,15 @@ TEST (websocket, bootstrap_exited) config.websocket_config.port = nano::get_available_port (); auto node1 (system.add_node (config)); - ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap)); - // Start bootstrap, exit after subscription std::atomic bootstrap_started{ false }; nano::util::counted_completion subscribed_completion (1); std::thread bootstrap_thread ([node1, &bootstrap_started, &subscribed_completion]() { node1->bootstrap_initiator.bootstrap (true, "123abc"); auto attempt (node1->bootstrap_initiator.current_attempt ()); - ASSERT_NE (nullptr, attempt); + EXPECT_NE (nullptr, attempt); bootstrap_started = true; - ASSERT_FALSE (subscribed_completion.await_count_for (5s)); + EXPECT_FALSE (subscribed_completion.await_count_for (5s)); }); // Wait for bootstrap start @@ -894,12 +718,16 @@ TEST (websocket, bootstrap_exited) } // Subscribe to bootstrap and wait for response asynchronously - ack_ready = false; - auto client_task = ([config]() -> boost::optional { - auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "subscribe", "topic": "bootstrap", "ack": true})json", true, true); - return response; + std::atomic ack_ready{ false }; + auto task = ([&ack_ready, config, &node1]() { + fake_websocket_client client (config.websocket_config.port); + client.send_message (R"json({"action": "subscribe", "topic": "bootstrap", "ack": true})json"); + client.await_ack (); + ack_ready = true; + EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap)); + return client.get_response (); }); - auto client_future = std::async (std::launch::async, client_task); + auto future = std::async (std::launch::async, task); // Wait for acknowledge system.deadline_set (5s); @@ -907,19 +735,18 @@ TEST (websocket, bootstrap_exited) { ASSERT_NO_ERROR (system.poll ()); } - ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap)); // Wait for the bootstrap notification subscribed_completion.increment (); bootstrap_thread.join (); system.deadline_set (5s); - while (client_future.wait_for (std::chrono::seconds (0)) != std::future_status::ready) + while (future.wait_for (0s) != std::future_status::ready) { ASSERT_NO_ERROR (system.poll ()); } // Check the bootstrap notification message - auto response = client_future.get (); + auto response = future.get (); ASSERT_TRUE (response); std::stringstream stream; stream << response; @@ -935,7 +762,7 @@ TEST (websocket, bootstrap_exited) ASSERT_LT (contents.get ("duration"), 15000); } -/** Tests clients subscribing multiple times or unsubscribing without a subscription */ +// Tests sending keepalive TEST (websocket, ws_keepalive) { nano::system system; @@ -944,14 +771,16 @@ TEST (websocket, ws_keepalive) config.websocket_config.port = nano::get_available_port (); auto node1 (system.add_node (config)); - ack_ready = false; - std::thread subscription_thread ([config]() { - websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "ping"})json", true, false); + auto task = ([config]() { + fake_websocket_client client (config.websocket_config.port); + client.send_message (R"json({"action": "ping"})json"); + client.await_ack (); }); + auto future = std::async (std::launch::async, task); + system.deadline_set (5s); - while (!ack_ready) + while (future.wait_for (0s) != std::future_status::ready) { ASSERT_NO_ERROR (system.poll ()); } - subscription_thread.join (); } diff --git a/nano/node/websocket.hpp b/nano/node/websocket.hpp index a4537e8c2..890f6cb9c 100644 --- a/nano/node/websocket.hpp +++ b/nano/node/websocket.hpp @@ -1,9 +1,11 @@ #pragma once +#include #include #include #include #include +#include #include