From 6c90bc220c7f6ff9fe64b1244267dc8d27882d4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20W=C3=B3jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Tue, 24 Jan 2023 02:06:25 +0100 Subject: [PATCH] Move websocket initialization code out of node constructor (#4068) --- nano/core_test/websocket.cpp | 98 ++++++++++---------- nano/node/blockprocessor.cpp | 4 +- nano/node/bootstrap/bootstrap_attempt.cpp | 8 +- nano/node/distributed_work.cpp | 8 +- nano/node/node.cpp | 83 +---------------- nano/node/node.hpp | 9 +- nano/node/websocket.cpp | 103 ++++++++++++++++++++++ nano/node/websocket.hpp | 27 ++++++ 8 files changed, 197 insertions(+), 143 deletions(-) diff --git a/nano/core_test/websocket.cpp b/nano/core_test/websocket.cpp index 4accfa05..36a71a19 100644 --- a/nano/core_test/websocket.cpp +++ b/nano/core_test/websocket.cpp @@ -28,22 +28,22 @@ TEST (websocket, subscription_edge) config.websocket_config.port = nano::test::get_available_port (); auto node1 (system.add_node (config)); - ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + ASSERT_EQ (0, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation)); auto task = ([config, &node1] () { - fake_websocket_client client (node1->websocket_server->listening_port ()); + fake_websocket_client client (node1->websocket.server->listening_port ()); client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json"); client.await_ack (); - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation)); client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json"); client.await_ack (); - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation)); client.send_message (R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json"); client.await_ack (); - EXPECT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + EXPECT_EQ (0, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation)); client.send_message (R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json"); client.await_ack (); - EXPECT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + EXPECT_EQ (0, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation)); }); auto future = std::async (std::launch::async, task); @@ -62,11 +62,11 @@ TEST (websocket, confirmation) std::atomic ack_ready{ false }; std::atomic unsubscribed{ false }; auto task = ([&ack_ready, &unsubscribed, config, &node1] () { - fake_websocket_client client (node1->websocket_server->listening_port ()); + fake_websocket_client client (node1->websocket.server->listening_port ()); client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json"); client.await_ack (); ack_ready = true; - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation)); auto response = client.get_response (); EXPECT_TRUE (response); boost::property_tree::ptree event; @@ -137,11 +137,11 @@ TEST (websocket, started_election) std::atomic ack_ready{ false }; auto task = ([&ack_ready, config, &node1] () { - fake_websocket_client client (node1->websocket_server->listening_port ()); + fake_websocket_client client (node1->websocket.server->listening_port ()); client.send_message (R"json({"action": "subscribe", "topic": "started_election", "ack": "true"})json"); client.await_ack (); ack_ready = true; - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::started_election)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::started_election)); return client.get_response (); }); auto future = std::async (std::launch::async, task); @@ -185,11 +185,11 @@ TEST (websocket, stopped_election) std::atomic ack_ready{ false }; auto task = ([&ack_ready, config, &node1] () { - fake_websocket_client client (node1->websocket_server->listening_port ()); + fake_websocket_client client (node1->websocket.server->listening_port ()); client.send_message (R"json({"action": "subscribe", "topic": "stopped_election", "ack": "true"})json"); client.await_ack (); ack_ready = true; - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::stopped_election)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::stopped_election)); return client.get_response (); }); auto future = std::async (std::launch::async, task); @@ -236,11 +236,11 @@ TEST (websocket, confirmation_options) std::atomic ack_ready{ false }; auto task1 = ([&ack_ready, config, &node1] () { - fake_websocket_client client (node1->websocket_server->listening_port ()); + fake_websocket_client client (node1->websocket.server->listening_port ()); client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "accounts": ["xrb_invalid"]}})json"); client.await_ack (); ack_ready = true; - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation)); auto response = client.get_response (1s); EXPECT_FALSE (response); }); @@ -275,11 +275,11 @@ TEST (websocket, confirmation_options) ack_ready = false; auto task2 = ([&ack_ready, config, &node1] () { - fake_websocket_client client (node1->websocket_server->listening_port ()); + fake_websocket_client client (node1->websocket.server->listening_port ()); client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "all_local_accounts": "true", "include_election_info": "true"}})json"); client.await_ack (); ack_ready = true; - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation)); return client.get_response (); }); auto future2 = std::async (std::launch::async, task2); @@ -337,11 +337,11 @@ TEST (websocket, confirmation_options) ack_ready = false; auto task3 = ([&ack_ready, config, &node1] () { - fake_websocket_client client (node1->websocket_server->listening_port ()); + fake_websocket_client client (node1->websocket.server->listening_port ()); client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "all_local_accounts": "true"}})json"); client.await_ack (); ack_ready = true; - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation)); auto response = client.get_response (1s); EXPECT_FALSE (response); }); @@ -379,11 +379,11 @@ TEST (websocket, confirmation_options_votes) std::atomic ack_ready{ false }; auto task1 = ([&ack_ready, config, &node1] () { - fake_websocket_client client (node1->websocket_server->listening_port ()); + fake_websocket_client client (node1->websocket.server->listening_port ()); client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "include_election_info_with_votes": "true", "include_block": "false"}})json"); client.await_ack (); ack_ready = true; - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation)); return client.get_response (); }); auto future1 = std::async (std::launch::async, task1); @@ -470,7 +470,7 @@ TEST (websocket, confirmation_options_sideband) client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "include_block": "false", "include_sideband_info": "true"}})json"); client.await_ack (); ack_ready = true; - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation)); return client.get_response (); }); auto future1 = std::async (std::launch::async, task1); @@ -537,23 +537,23 @@ TEST (websocket, confirmation_options_update) std::atomic added{ false }; std::atomic deleted{ false }; auto task = ([&added, &deleted, config, &node1] () { - fake_websocket_client client (node1->websocket_server->listening_port ()); + fake_websocket_client client (node1->websocket.server->listening_port ()); // Subscribe initially with empty options, everything will be filtered client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {}})json"); client.await_ack (); - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation)); // Now update filter with an account and wait for a response std::string add_message = boost::str (boost::format (R"json({"action": "update", "topic": "confirmation", "ack": "true", "options": {"accounts_add": ["%1%"]}})json") % nano::dev::genesis_key.pub.to_account ()); client.send_message (add_message); client.await_ack (); - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation)); added = true; EXPECT_TRUE (client.get_response ()); // Update the filter again, removing the account std::string delete_message = boost::str (boost::format (R"json({"action": "update", "topic": "confirmation", "ack": "true", "options": {"accounts_del": ["%1%"]}})json") % nano::dev::genesis_key.pub.to_account ()); client.send_message (delete_message); client.await_ack (); - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation)); deleted = true; EXPECT_FALSE (client.get_response (1s)); }); @@ -611,11 +611,11 @@ TEST (websocket, vote) std::atomic ack_ready{ false }; auto task = ([&ack_ready, config, &node1] () { - fake_websocket_client client (node1->websocket_server->listening_port ()); + fake_websocket_client client (node1->websocket.server->listening_port ()); client.send_message (R"json({"action": "subscribe", "topic": "vote", "ack": true})json"); client.await_ack (); ack_ready = true; - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::vote)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::vote)); return client.get_response (); }); auto future = std::async (std::launch::async, task); @@ -661,11 +661,11 @@ TEST (websocket, vote_options_type) std::atomic ack_ready{ false }; auto task = ([&ack_ready, config, &node1] () { - fake_websocket_client client (node1->websocket_server->listening_port ()); + fake_websocket_client client (node1->websocket.server->listening_port ()); client.send_message (R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"include_replays": "true", "include_indeterminate": "false"}})json"); client.await_ack (); ack_ready = true; - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::vote)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::vote)); return client.get_response (); }); auto future = std::async (std::launch::async, task); @@ -676,7 +676,7 @@ TEST (websocket, vote_options_type) auto vote (std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, 0, 0, std::vector{ nano::dev::genesis->hash () })); nano::websocket::message_builder builder; auto msg (builder.vote_received (vote, nano::vote_code::replay)); - node1->websocket_server->broadcast (msg); + node1->websocket.server->broadcast (msg); ASSERT_TIMELY (5s, future.wait_for (0s) == std::future_status::ready); @@ -702,12 +702,12 @@ TEST (websocket, vote_options_representatives) std::atomic ack_ready{ false }; auto task1 = ([&ack_ready, config, &node1] () { - fake_websocket_client client (node1->websocket_server->listening_port ()); + fake_websocket_client client (node1->websocket.server->listening_port ()); std::string message = boost::str (boost::format (R"json({"action": "subscribe", "topic": "vote", "ack": "true", "options": {"representatives": ["%1%"]}})json") % nano::dev::genesis_key.pub.to_account ()); client.send_message (message); client.await_ack (); ack_ready = true; - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::vote)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::vote)); auto response = client.get_response (); EXPECT_TRUE (response); boost::property_tree::ptree event; @@ -746,11 +746,11 @@ TEST (websocket, vote_options_representatives) ack_ready = false; auto task2 = ([&ack_ready, config, &node1] () { - fake_websocket_client client (node1->websocket_server->listening_port ()); + fake_websocket_client client (node1->websocket.server->listening_port ()); client.send_message (R"json({"action": "subscribe", "topic": "vote", "ack": "true", "options": {"representatives": ["xrb_invalid"]}})json"); client.await_ack (); ack_ready = true; - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::vote)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::vote)); auto response = client.get_response (); // A list of invalid representatives is the same as no filter EXPECT_TRUE (response); @@ -775,23 +775,23 @@ TEST (websocket, work) config.websocket_config.port = nano::test::get_available_port (); auto node1 (system.add_node (config)); - ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::work)); + ASSERT_EQ (0, node1->websocket.server->subscriber_count (nano::websocket::topic::work)); // Subscribe to work and wait for response asynchronously std::atomic ack_ready{ false }; auto task = ([&ack_ready, config, &node1] () { - fake_websocket_client client (node1->websocket_server->listening_port ()); + fake_websocket_client client (node1->websocket.server->listening_port ()); client.send_message (R"json({"action": "subscribe", "topic": "work", "ack": true})json"); client.await_ack (); ack_ready = true; - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::work)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::work)); return client.get_response (); }); auto future = std::async (std::launch::async, task); // Wait for acknowledge ASSERT_TIMELY (5s, ack_ready); - ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::work)); + ASSERT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::work)); // Generate work nano::block_hash hash{ 1 }; @@ -845,16 +845,16 @@ TEST (websocket, bootstrap) config.websocket_config.port = nano::test::get_available_port (); auto node1 (system.add_node (config)); - ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap)); + ASSERT_EQ (0, node1->websocket.server->subscriber_count (nano::websocket::topic::bootstrap)); // Subscribe to bootstrap and wait for response asynchronously std::atomic ack_ready{ false }; auto task = ([&ack_ready, config, &node1] () { - fake_websocket_client client (node1->websocket_server->listening_port ()); + fake_websocket_client client (node1->websocket.server->listening_port ()); client.send_message (R"json({"action": "subscribe", "topic": "bootstrap", "ack": true})json"); client.await_ack (); ack_ready = true; - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::bootstrap)); return client.get_response (); }); auto future = std::async (std::launch::async, task); @@ -917,11 +917,11 @@ TEST (websocket, bootstrap_exited) // Subscribe to bootstrap and wait for response asynchronously std::atomic ack_ready{ false }; auto task = ([&ack_ready, config, &node1] () { - fake_websocket_client client (node1->websocket_server->listening_port ()); + fake_websocket_client client (node1->websocket.server->listening_port ()); client.send_message (R"json({"action": "subscribe", "topic": "bootstrap", "ack": true})json"); client.await_ack (); ack_ready = true; - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::bootstrap)); return client.get_response (); }); auto future = std::async (std::launch::async, task); @@ -961,7 +961,7 @@ TEST (websocket, ws_keepalive) auto node1 (system.add_node (config)); auto task = ([&node1] () { - fake_websocket_client client (node1->websocket_server->listening_port ()); + fake_websocket_client client (node1->websocket.server->listening_port ()); client.send_message (R"json({"action": "ping"})json"); client.await_ack (); }); @@ -990,11 +990,11 @@ TEST (websocket, telemetry) std::atomic done{ false }; auto task = ([config = node1->config, &node1, &done] () { - fake_websocket_client client (node1->websocket_server->listening_port ()); + fake_websocket_client client (node1->websocket.server->listening_port ()); client.send_message (R"json({"action": "subscribe", "topic": "telemetry", "ack": true})json"); client.await_ack (); done = true; - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::telemetry)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::telemetry)); return client.get_response (); }); @@ -1027,7 +1027,7 @@ TEST (websocket, telemetry) ASSERT_EQ (contents.get ("port"), node2->network.endpoint ().port ()); // Other node should have no subscribers - EXPECT_EQ (0, node2->websocket_server->subscriber_count (nano::websocket::topic::telemetry)); + EXPECT_EQ (0, node2->websocket.server->subscriber_count (nano::websocket::topic::telemetry)); } TEST (websocket, new_unconfirmed_block) @@ -1040,11 +1040,11 @@ TEST (websocket, new_unconfirmed_block) std::atomic ack_ready{ false }; auto task = ([&ack_ready, config, node1] () { - fake_websocket_client client (node1->websocket_server->listening_port ()); + fake_websocket_client client (node1->websocket.server->listening_port ()); client.send_message (R"json({"action": "subscribe", "topic": "new_unconfirmed_block", "ack": "true"})json"); client.await_ack (); ack_ready = true; - EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::new_unconfirmed_block)); + EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::new_unconfirmed_block)); return client.get_response (); }); auto future = std::async (std::launch::async, task); diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 7b890eac..746bd0be 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -320,9 +320,9 @@ void nano::block_processor::process_live (nano::transaction const & transaction_ node.network.flood_block (block_a, nano::buffer_drop_policy::limiter); } - if (node.websocket_server && node.websocket_server->any_subscriber (nano::websocket::topic::new_unconfirmed_block)) + if (node.websocket.server && node.websocket.server->any_subscriber (nano::websocket::topic::new_unconfirmed_block)) { - node.websocket_server->broadcast (nano::websocket::message_builder ().new_block_arrived (*block_a)); + node.websocket.server->broadcast (nano::websocket::message_builder ().new_block_arrived (*block_a)); } } diff --git a/nano/node/bootstrap/bootstrap_attempt.cpp b/nano/node/bootstrap/bootstrap_attempt.cpp index 93bb94eb..fe48c302 100644 --- a/nano/node/bootstrap/bootstrap_attempt.cpp +++ b/nano/node/bootstrap/bootstrap_attempt.cpp @@ -22,10 +22,10 @@ nano::bootstrap_attempt::bootstrap_attempt (std::shared_ptr const & node->logger.always_log (boost::str (boost::format ("Starting %1% bootstrap attempt with ID %2%") % mode_text () % id)); node->bootstrap_initiator.notify_listeners (true); - if (node->websocket_server) + if (node->websocket.server) { nano::websocket::message_builder builder; - node->websocket_server->broadcast (builder.bootstrap_started (id, mode_text ())); + node->websocket.server->broadcast (builder.bootstrap_started (id, mode_text ())); } } @@ -33,10 +33,10 @@ nano::bootstrap_attempt::~bootstrap_attempt () { node->logger.always_log (boost::str (boost::format ("Exiting %1% bootstrap attempt with ID %2%") % mode_text () % id)); node->bootstrap_initiator.notify_listeners (false); - if (node->websocket_server) + if (node->websocket.server) { nano::websocket::message_builder builder; - node->websocket_server->broadcast (builder.bootstrap_exited (id, mode_text (), attempt_start, total_blocks)); + node->websocket.server->broadcast (builder.bootstrap_exited (id, mode_text (), attempt_start, total_blocks)); } } diff --git a/nano/node/distributed_work.cpp b/nano/node/distributed_work.cpp index f4842053..b710c415 100644 --- a/nano/node/distributed_work.cpp +++ b/nano/node/distributed_work.cpp @@ -37,20 +37,20 @@ nano::distributed_work::~distributed_work () debug_assert (status != work_generation_status::ongoing); if (auto node_l = node_w.lock ()) { - if (!node_l->stopped && node_l->websocket_server && node_l->websocket_server->any_subscriber (nano::websocket::topic::work)) + if (!node_l->stopped && node_l->websocket.server && node_l->websocket.server->any_subscriber (nano::websocket::topic::work)) { nano::websocket::message_builder builder; if (status == work_generation_status::success) { - node_l->websocket_server->broadcast (builder.work_generation (request.version, request.root.as_block_hash (), work_result, request.difficulty, node_l->default_difficulty (request.version), elapsed.value (), winner, bad_peers)); + node_l->websocket.server->broadcast (builder.work_generation (request.version, request.root.as_block_hash (), work_result, request.difficulty, node_l->default_difficulty (request.version), elapsed.value (), winner, bad_peers)); } else if (status == work_generation_status::cancelled) { - node_l->websocket_server->broadcast (builder.work_cancelled (request.version, request.root.as_block_hash (), request.difficulty, node_l->default_difficulty (request.version), elapsed.value (), bad_peers)); + node_l->websocket.server->broadcast (builder.work_cancelled (request.version, request.root.as_block_hash (), request.difficulty, node_l->default_difficulty (request.version), elapsed.value (), bad_peers)); } else if (status == work_generation_status::failure_local || status == work_generation_status::failure_peers) { - node_l->websocket_server->broadcast (builder.work_failed (request.version, request.root.as_block_hash (), request.difficulty, node_l->default_difficulty (request.version), elapsed.value (), bad_peers)); + node_l->websocket.server->broadcast (builder.work_failed (request.version, request.root.as_block_hash (), request.difficulty, node_l->default_difficulty (request.version), elapsed.value (), bad_peers)); } } stop_once (true); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 9e71ad25..07e7dabe 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -200,6 +200,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co aggregator (config, stats, generator, final_generator, history, ledger, wallets, active), wallets (wallets_store.init_error (), *this), backlog{ nano::backlog_population_config (config), store, stats }, + websocket{ config.websocket_config, observers, wallets, ledger, io_ctx, logger }, startup_time (std::chrono::steady_clock::now ()), node_seq (seq) { @@ -226,13 +227,6 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co hinting.notify (); }; - if (config.websocket_config.enabled) - { - auto endpoint_l (nano::tcp_endpoint (boost::asio::ip::make_address_v6 (config.websocket_config.address), config.websocket_config.port)); - websocket_server = std::make_shared (config.websocket_config.tls_config, logger, wallets, io_ctx, endpoint_l); - this->websocket_server->run (); - } - wallets.observer = [this] (bool active) { observers.wallet.notify (active); }; @@ -306,64 +300,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co } }); } - if (websocket_server) - { - observers.blocks.add ([this] (nano::election_status const & status_a, std::vector const & votes_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a, bool is_state_epoch_a) { - debug_assert (status_a.type != nano::election_status_type::ongoing); - if (this->websocket_server->any_subscriber (nano::websocket::topic::confirmation)) - { - auto block_a (status_a.winner); - std::string subtype; - if (is_state_send_a) - { - subtype = "send"; - } - else if (block_a->type () == nano::block_type::state) - { - if (block_a->link ().is_zero ()) - { - subtype = "change"; - } - else if (is_state_epoch_a) - { - debug_assert (amount_a == 0 && this->ledger.is_epoch_link (block_a->link ())); - subtype = "epoch"; - } - else - { - subtype = "receive"; - } - } - - this->websocket_server->broadcast_confirmation (block_a, account_a, amount_a, subtype, status_a, votes_a); - } - }); - - observers.active_started.add ([this] (nano::block_hash const & hash_a) { - if (this->websocket_server->any_subscriber (nano::websocket::topic::started_election)) - { - nano::websocket::message_builder builder; - this->websocket_server->broadcast (builder.started_election (hash_a)); - } - }); - - observers.active_stopped.add ([this] (nano::block_hash const & hash_a) { - if (this->websocket_server->any_subscriber (nano::websocket::topic::stopped_election)) - { - nano::websocket::message_builder builder; - this->websocket_server->broadcast (builder.stopped_election (hash_a)); - } - }); - - observers.telemetry.add ([this] (nano::telemetry_data const & telemetry_data, nano::endpoint const & endpoint) { - if (this->websocket_server->any_subscriber (nano::websocket::topic::telemetry)) - { - nano::websocket::message_builder builder; - this->websocket_server->broadcast (builder.telemetry_received (telemetry_data, endpoint)); - } - }); - } // Add block confirmation type stats regardless of http-callback and websocket subscriptions observers.blocks.add ([this] (nano::election_status const & status_a, std::vector const & votes_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a, bool is_state_epoch_a) { debug_assert (status_a.type != nano::election_status_type::ongoing); @@ -406,17 +343,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co this->gap_cache.vote (vote_a); } }); - if (websocket_server) - { - observers.vote.add ([this] (std::shared_ptr vote_a, std::shared_ptr const & channel_a, nano::vote_code code_a) { - if (this->websocket_server->any_subscriber (nano::websocket::topic::vote)) - { - nano::websocket::message_builder builder; - auto msg (builder.vote_received (vote_a, code_a)); - this->websocket_server->broadcast (msg); - } - }); - } + // Cancelling local work generation observers.work_cancel.add ([this] (nano::root const & root_a) { this->work.cancel (root_a); @@ -771,6 +698,7 @@ void nano::node::start () backlog.start (); hinting.start (); bootstrap_server.start (); + websocket.start (); } void nano::node::stop () @@ -794,10 +722,7 @@ void nano::node::stop () confirmation_height_processor.stop (); network.stop (); telemetry->stop (); - if (websocket_server) - { - websocket_server->stop (); - } + websocket.stop (); bootstrap_server.stop (); bootstrap_initiator.stop (); tcp_listener.stop (); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 4f204bf0..dba1fe4e 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -47,10 +48,6 @@ namespace nano namespace rocksdb { } // Declare a namespace rocksdb inside nano so all references to the rocksdb library need to be globally scoped e.g. ::rocksdb::Slice -namespace websocket -{ - class listener; -} class node; class telemetry; class work_pool; @@ -69,6 +66,8 @@ public: node (boost::asio::io_context &, uint16_t, boost::filesystem::path const &, nano::logging const &, nano::work_pool &, nano::node_flags = nano::node_flags (), unsigned seq = 0); node (boost::asio::io_context &, boost::filesystem::path const &, nano::node_config const &, nano::work_pool &, nano::node_flags = nano::node_flags (), unsigned seq = 0); ~node (); + +public: template void background (T action_a) { @@ -144,7 +143,6 @@ public: nano::stat stats; nano::thread_pool workers; nano::thread_pool bootstrap_workers; - std::shared_ptr websocket_server; nano::node_flags flags; nano::work_pool & work; nano::distributed_work_factory distributed_work; @@ -186,6 +184,7 @@ public: nano::request_aggregator aggregator; nano::wallets wallets; nano::backlog_population backlog; + nano::websocket_server websocket; std::chrono::steady_clock::time_point const startup_time; std::chrono::seconds unchecked_cutoff = std::chrono::seconds (7 * 24 * 60 * 60); // Week diff --git a/nano/node/websocket.cpp b/nano/node/websocket.cpp index 7489ba46..89ec692d 100644 --- a/nano/node/websocket.cpp +++ b/nano/node/websocket.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -961,3 +962,105 @@ std::string nano::websocket::message::to_string () const ostream.flush (); return ostream.str (); } + +/* + * websocket_server + */ + +nano::websocket_server::websocket_server (nano::websocket::config & config_a, nano::node_observers & observers_a, nano::wallets & wallets_a, nano::ledger & ledger_a, boost::asio::io_context & io_ctx_a, nano::logger_mt & logger_a) : + config{ config_a }, + observers{ observers_a }, + wallets{ wallets_a }, + ledger{ ledger_a }, + io_ctx{ io_ctx_a }, + logger{ logger_a } +{ + if (!config.enabled) + { + return; + } + + auto endpoint = nano::tcp_endpoint{ boost::asio::ip::make_address_v6 (config.address), config.port }; + server = std::make_shared (config.tls_config, logger, wallets, io_ctx, endpoint); + + observers.blocks.add ([this] (nano::election_status const & status_a, std::vector const & votes_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a, bool is_state_epoch_a) { + debug_assert (status_a.type != nano::election_status_type::ongoing); + + if (server->any_subscriber (nano::websocket::topic::confirmation)) + { + auto block_a = status_a.winner; + std::string subtype; + if (is_state_send_a) + { + subtype = "send"; + } + else if (block_a->type () == nano::block_type::state) + { + if (block_a->link ().is_zero ()) + { + subtype = "change"; + } + else if (is_state_epoch_a) + { + debug_assert (amount_a == 0 && ledger.is_epoch_link (block_a->link ())); + subtype = "epoch"; + } + else + { + subtype = "receive"; + } + } + + server->broadcast_confirmation (block_a, account_a, amount_a, subtype, status_a, votes_a); + } + }); + + observers.active_started.add ([this] (nano::block_hash const & hash_a) { + if (server->any_subscriber (nano::websocket::topic::started_election)) + { + nano::websocket::message_builder builder; + server->broadcast (builder.started_election (hash_a)); + } + }); + + observers.active_stopped.add ([this] (nano::block_hash const & hash_a) { + if (server->any_subscriber (nano::websocket::topic::stopped_election)) + { + nano::websocket::message_builder builder; + server->broadcast (builder.stopped_election (hash_a)); + } + }); + + observers.telemetry.add ([this] (nano::telemetry_data const & telemetry_data, nano::endpoint const & endpoint) { + if (server->any_subscriber (nano::websocket::topic::telemetry)) + { + nano::websocket::message_builder builder; + server->broadcast (builder.telemetry_received (telemetry_data, endpoint)); + } + }); + + observers.vote.add ([this] (std::shared_ptr vote_a, std::shared_ptr const & channel_a, nano::vote_code code_a) { + if (server->any_subscriber (nano::websocket::topic::vote)) + { + nano::websocket::message_builder builder; + auto msg{ builder.vote_received (vote_a, code_a) }; + server->broadcast (msg); + } + }); +} + +void nano::websocket_server::start () +{ + if (server) + { + server->run (); + } +} + +void nano::websocket_server::stop () +{ + if (server) + { + server->stop (); + } +} \ No newline at end of file diff --git a/nano/node/websocket.hpp b/nano/node/websocket.hpp index f3bc9fc2..9d197dcf 100644 --- a/nano/node/websocket.hpp +++ b/nano/node/websocket.hpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -25,7 +26,9 @@ class vote; class election_status; class telemetry_data; class tls_config; +class node_observers; enum class election_status_type : uint8_t; + namespace websocket { class listener; @@ -353,4 +356,28 @@ namespace websocket std::atomic stopped{ false }; }; } + +/** + * Wrapper of websocket related functionality that node interacts with + */ +class websocket_server +{ +public: + websocket_server (nano::websocket::config &, nano::node_observers &, nano::wallets &, nano::ledger &, boost::asio::io_context &, nano::logger_mt &); + + void start (); + void stop (); + +private: // Dependencies + nano::websocket::config const & config; + nano::node_observers & observers; + nano::wallets & wallets; + nano::ledger & ledger; + boost::asio::io_context & io_ctx; + nano::logger_mt & logger; + +public: + // TODO: Encapsulate, this is public just because existing code needs it + std::shared_ptr server; +}; }