Move websocket initialization code out of node constructor (#4068)

This commit is contained in:
Piotr Wójcik 2023-01-24 02:06:25 +01:00 committed by GitHub
commit 6c90bc220c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 197 additions and 143 deletions

View file

@ -28,22 +28,22 @@ TEST (websocket, subscription_edge)
config.websocket_config.port = nano::test::get_available_port ();
auto node1 (system.add_node (config));
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
ASSERT_EQ (0, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation));
auto task = ([config, &node1] () {
fake_websocket_client client (node1->websocket_server->listening_port ());
fake_websocket_client client (node1->websocket.server->listening_port ());
client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json");
client.await_ack ();
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation));
client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json");
client.await_ack ();
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation));
client.send_message (R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json");
client.await_ack ();
EXPECT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
EXPECT_EQ (0, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation));
client.send_message (R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json");
client.await_ack ();
EXPECT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
EXPECT_EQ (0, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation));
});
auto future = std::async (std::launch::async, task);
@ -62,11 +62,11 @@ TEST (websocket, confirmation)
std::atomic<bool> ack_ready{ false };
std::atomic<bool> unsubscribed{ false };
auto task = ([&ack_ready, &unsubscribed, config, &node1] () {
fake_websocket_client client (node1->websocket_server->listening_port ());
fake_websocket_client client (node1->websocket.server->listening_port ());
client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation));
auto response = client.get_response ();
EXPECT_TRUE (response);
boost::property_tree::ptree event;
@ -137,11 +137,11 @@ TEST (websocket, started_election)
std::atomic<bool> ack_ready{ false };
auto task = ([&ack_ready, config, &node1] () {
fake_websocket_client client (node1->websocket_server->listening_port ());
fake_websocket_client client (node1->websocket.server->listening_port ());
client.send_message (R"json({"action": "subscribe", "topic": "started_election", "ack": "true"})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::started_election));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::started_election));
return client.get_response ();
});
auto future = std::async (std::launch::async, task);
@ -185,11 +185,11 @@ TEST (websocket, stopped_election)
std::atomic<bool> ack_ready{ false };
auto task = ([&ack_ready, config, &node1] () {
fake_websocket_client client (node1->websocket_server->listening_port ());
fake_websocket_client client (node1->websocket.server->listening_port ());
client.send_message (R"json({"action": "subscribe", "topic": "stopped_election", "ack": "true"})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::stopped_election));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::stopped_election));
return client.get_response ();
});
auto future = std::async (std::launch::async, task);
@ -236,11 +236,11 @@ TEST (websocket, confirmation_options)
std::atomic<bool> ack_ready{ false };
auto task1 = ([&ack_ready, config, &node1] () {
fake_websocket_client client (node1->websocket_server->listening_port ());
fake_websocket_client client (node1->websocket.server->listening_port ());
client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "accounts": ["xrb_invalid"]}})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation));
auto response = client.get_response (1s);
EXPECT_FALSE (response);
});
@ -275,11 +275,11 @@ TEST (websocket, confirmation_options)
ack_ready = false;
auto task2 = ([&ack_ready, config, &node1] () {
fake_websocket_client client (node1->websocket_server->listening_port ());
fake_websocket_client client (node1->websocket.server->listening_port ());
client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "all_local_accounts": "true", "include_election_info": "true"}})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation));
return client.get_response ();
});
auto future2 = std::async (std::launch::async, task2);
@ -337,11 +337,11 @@ TEST (websocket, confirmation_options)
ack_ready = false;
auto task3 = ([&ack_ready, config, &node1] () {
fake_websocket_client client (node1->websocket_server->listening_port ());
fake_websocket_client client (node1->websocket.server->listening_port ());
client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "all_local_accounts": "true"}})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation));
auto response = client.get_response (1s);
EXPECT_FALSE (response);
});
@ -379,11 +379,11 @@ TEST (websocket, confirmation_options_votes)
std::atomic<bool> ack_ready{ false };
auto task1 = ([&ack_ready, config, &node1] () {
fake_websocket_client client (node1->websocket_server->listening_port ());
fake_websocket_client client (node1->websocket.server->listening_port ());
client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "include_election_info_with_votes": "true", "include_block": "false"}})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation));
return client.get_response ();
});
auto future1 = std::async (std::launch::async, task1);
@ -470,7 +470,7 @@ TEST (websocket, confirmation_options_sideband)
client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "include_block": "false", "include_sideband_info": "true"}})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation));
return client.get_response ();
});
auto future1 = std::async (std::launch::async, task1);
@ -537,23 +537,23 @@ TEST (websocket, confirmation_options_update)
std::atomic<bool> added{ false };
std::atomic<bool> deleted{ false };
auto task = ([&added, &deleted, config, &node1] () {
fake_websocket_client client (node1->websocket_server->listening_port ());
fake_websocket_client client (node1->websocket.server->listening_port ());
// Subscribe initially with empty options, everything will be filtered
client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {}})json");
client.await_ack ();
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation));
// Now update filter with an account and wait for a response
std::string add_message = boost::str (boost::format (R"json({"action": "update", "topic": "confirmation", "ack": "true", "options": {"accounts_add": ["%1%"]}})json") % nano::dev::genesis_key.pub.to_account ());
client.send_message (add_message);
client.await_ack ();
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation));
added = true;
EXPECT_TRUE (client.get_response ());
// Update the filter again, removing the account
std::string delete_message = boost::str (boost::format (R"json({"action": "update", "topic": "confirmation", "ack": "true", "options": {"accounts_del": ["%1%"]}})json") % nano::dev::genesis_key.pub.to_account ());
client.send_message (delete_message);
client.await_ack ();
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation));
deleted = true;
EXPECT_FALSE (client.get_response (1s));
});
@ -611,11 +611,11 @@ TEST (websocket, vote)
std::atomic<bool> ack_ready{ false };
auto task = ([&ack_ready, config, &node1] () {
fake_websocket_client client (node1->websocket_server->listening_port ());
fake_websocket_client client (node1->websocket.server->listening_port ());
client.send_message (R"json({"action": "subscribe", "topic": "vote", "ack": true})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::vote));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::vote));
return client.get_response ();
});
auto future = std::async (std::launch::async, task);
@ -661,11 +661,11 @@ TEST (websocket, vote_options_type)
std::atomic<bool> ack_ready{ false };
auto task = ([&ack_ready, config, &node1] () {
fake_websocket_client client (node1->websocket_server->listening_port ());
fake_websocket_client client (node1->websocket.server->listening_port ());
client.send_message (R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"include_replays": "true", "include_indeterminate": "false"}})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::vote));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::vote));
return client.get_response ();
});
auto future = std::async (std::launch::async, task);
@ -676,7 +676,7 @@ TEST (websocket, vote_options_type)
auto vote (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, 0, 0, std::vector<nano::block_hash>{ nano::dev::genesis->hash () }));
nano::websocket::message_builder builder;
auto msg (builder.vote_received (vote, nano::vote_code::replay));
node1->websocket_server->broadcast (msg);
node1->websocket.server->broadcast (msg);
ASSERT_TIMELY (5s, future.wait_for (0s) == std::future_status::ready);
@ -702,12 +702,12 @@ TEST (websocket, vote_options_representatives)
std::atomic<bool> ack_ready{ false };
auto task1 = ([&ack_ready, config, &node1] () {
fake_websocket_client client (node1->websocket_server->listening_port ());
fake_websocket_client client (node1->websocket.server->listening_port ());
std::string message = boost::str (boost::format (R"json({"action": "subscribe", "topic": "vote", "ack": "true", "options": {"representatives": ["%1%"]}})json") % nano::dev::genesis_key.pub.to_account ());
client.send_message (message);
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::vote));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::vote));
auto response = client.get_response ();
EXPECT_TRUE (response);
boost::property_tree::ptree event;
@ -746,11 +746,11 @@ TEST (websocket, vote_options_representatives)
ack_ready = false;
auto task2 = ([&ack_ready, config, &node1] () {
fake_websocket_client client (node1->websocket_server->listening_port ());
fake_websocket_client client (node1->websocket.server->listening_port ());
client.send_message (R"json({"action": "subscribe", "topic": "vote", "ack": "true", "options": {"representatives": ["xrb_invalid"]}})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::vote));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::vote));
auto response = client.get_response ();
// A list of invalid representatives is the same as no filter
EXPECT_TRUE (response);
@ -775,23 +775,23 @@ TEST (websocket, work)
config.websocket_config.port = nano::test::get_available_port ();
auto node1 (system.add_node (config));
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::work));
ASSERT_EQ (0, node1->websocket.server->subscriber_count (nano::websocket::topic::work));
// Subscribe to work and wait for response asynchronously
std::atomic<bool> ack_ready{ false };
auto task = ([&ack_ready, config, &node1] () {
fake_websocket_client client (node1->websocket_server->listening_port ());
fake_websocket_client client (node1->websocket.server->listening_port ());
client.send_message (R"json({"action": "subscribe", "topic": "work", "ack": true})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::work));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::work));
return client.get_response ();
});
auto future = std::async (std::launch::async, task);
// Wait for acknowledge
ASSERT_TIMELY (5s, ack_ready);
ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::work));
ASSERT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::work));
// Generate work
nano::block_hash hash{ 1 };
@ -845,16 +845,16 @@ TEST (websocket, bootstrap)
config.websocket_config.port = nano::test::get_available_port ();
auto node1 (system.add_node (config));
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));
ASSERT_EQ (0, node1->websocket.server->subscriber_count (nano::websocket::topic::bootstrap));
// Subscribe to bootstrap and wait for response asynchronously
std::atomic<bool> ack_ready{ false };
auto task = ([&ack_ready, config, &node1] () {
fake_websocket_client client (node1->websocket_server->listening_port ());
fake_websocket_client client (node1->websocket.server->listening_port ());
client.send_message (R"json({"action": "subscribe", "topic": "bootstrap", "ack": true})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::bootstrap));
return client.get_response ();
});
auto future = std::async (std::launch::async, task);
@ -917,11 +917,11 @@ TEST (websocket, bootstrap_exited)
// Subscribe to bootstrap and wait for response asynchronously
std::atomic<bool> ack_ready{ false };
auto task = ([&ack_ready, config, &node1] () {
fake_websocket_client client (node1->websocket_server->listening_port ());
fake_websocket_client client (node1->websocket.server->listening_port ());
client.send_message (R"json({"action": "subscribe", "topic": "bootstrap", "ack": true})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::bootstrap));
return client.get_response ();
});
auto future = std::async (std::launch::async, task);
@ -961,7 +961,7 @@ TEST (websocket, ws_keepalive)
auto node1 (system.add_node (config));
auto task = ([&node1] () {
fake_websocket_client client (node1->websocket_server->listening_port ());
fake_websocket_client client (node1->websocket.server->listening_port ());
client.send_message (R"json({"action": "ping"})json");
client.await_ack ();
});
@ -990,11 +990,11 @@ TEST (websocket, telemetry)
std::atomic<bool> done{ false };
auto task = ([config = node1->config, &node1, &done] () {
fake_websocket_client client (node1->websocket_server->listening_port ());
fake_websocket_client client (node1->websocket.server->listening_port ());
client.send_message (R"json({"action": "subscribe", "topic": "telemetry", "ack": true})json");
client.await_ack ();
done = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::telemetry));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::telemetry));
return client.get_response ();
});
@ -1027,7 +1027,7 @@ TEST (websocket, telemetry)
ASSERT_EQ (contents.get<uint16_t> ("port"), node2->network.endpoint ().port ());
// Other node should have no subscribers
EXPECT_EQ (0, node2->websocket_server->subscriber_count (nano::websocket::topic::telemetry));
EXPECT_EQ (0, node2->websocket.server->subscriber_count (nano::websocket::topic::telemetry));
}
TEST (websocket, new_unconfirmed_block)
@ -1040,11 +1040,11 @@ TEST (websocket, new_unconfirmed_block)
std::atomic<bool> ack_ready{ false };
auto task = ([&ack_ready, config, node1] () {
fake_websocket_client client (node1->websocket_server->listening_port ());
fake_websocket_client client (node1->websocket.server->listening_port ());
client.send_message (R"json({"action": "subscribe", "topic": "new_unconfirmed_block", "ack": "true"})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::new_unconfirmed_block));
EXPECT_EQ (1, node1->websocket.server->subscriber_count (nano::websocket::topic::new_unconfirmed_block));
return client.get_response ();
});
auto future = std::async (std::launch::async, task);

View file

@ -320,9 +320,9 @@ void nano::block_processor::process_live (nano::transaction const & transaction_
node.network.flood_block (block_a, nano::buffer_drop_policy::limiter);
}
if (node.websocket_server && node.websocket_server->any_subscriber (nano::websocket::topic::new_unconfirmed_block))
if (node.websocket.server && node.websocket.server->any_subscriber (nano::websocket::topic::new_unconfirmed_block))
{
node.websocket_server->broadcast (nano::websocket::message_builder ().new_block_arrived (*block_a));
node.websocket.server->broadcast (nano::websocket::message_builder ().new_block_arrived (*block_a));
}
}

View file

@ -22,10 +22,10 @@ nano::bootstrap_attempt::bootstrap_attempt (std::shared_ptr<nano::node> const &
node->logger.always_log (boost::str (boost::format ("Starting %1% bootstrap attempt with ID %2%") % mode_text () % id));
node->bootstrap_initiator.notify_listeners (true);
if (node->websocket_server)
if (node->websocket.server)
{
nano::websocket::message_builder builder;
node->websocket_server->broadcast (builder.bootstrap_started (id, mode_text ()));
node->websocket.server->broadcast (builder.bootstrap_started (id, mode_text ()));
}
}
@ -33,10 +33,10 @@ nano::bootstrap_attempt::~bootstrap_attempt ()
{
node->logger.always_log (boost::str (boost::format ("Exiting %1% bootstrap attempt with ID %2%") % mode_text () % id));
node->bootstrap_initiator.notify_listeners (false);
if (node->websocket_server)
if (node->websocket.server)
{
nano::websocket::message_builder builder;
node->websocket_server->broadcast (builder.bootstrap_exited (id, mode_text (), attempt_start, total_blocks));
node->websocket.server->broadcast (builder.bootstrap_exited (id, mode_text (), attempt_start, total_blocks));
}
}

View file

@ -37,20 +37,20 @@ nano::distributed_work::~distributed_work ()
debug_assert (status != work_generation_status::ongoing);
if (auto node_l = node_w.lock ())
{
if (!node_l->stopped && node_l->websocket_server && node_l->websocket_server->any_subscriber (nano::websocket::topic::work))
if (!node_l->stopped && node_l->websocket.server && node_l->websocket.server->any_subscriber (nano::websocket::topic::work))
{
nano::websocket::message_builder builder;
if (status == work_generation_status::success)
{
node_l->websocket_server->broadcast (builder.work_generation (request.version, request.root.as_block_hash (), work_result, request.difficulty, node_l->default_difficulty (request.version), elapsed.value (), winner, bad_peers));
node_l->websocket.server->broadcast (builder.work_generation (request.version, request.root.as_block_hash (), work_result, request.difficulty, node_l->default_difficulty (request.version), elapsed.value (), winner, bad_peers));
}
else if (status == work_generation_status::cancelled)
{
node_l->websocket_server->broadcast (builder.work_cancelled (request.version, request.root.as_block_hash (), request.difficulty, node_l->default_difficulty (request.version), elapsed.value (), bad_peers));
node_l->websocket.server->broadcast (builder.work_cancelled (request.version, request.root.as_block_hash (), request.difficulty, node_l->default_difficulty (request.version), elapsed.value (), bad_peers));
}
else if (status == work_generation_status::failure_local || status == work_generation_status::failure_peers)
{
node_l->websocket_server->broadcast (builder.work_failed (request.version, request.root.as_block_hash (), request.difficulty, node_l->default_difficulty (request.version), elapsed.value (), bad_peers));
node_l->websocket.server->broadcast (builder.work_failed (request.version, request.root.as_block_hash (), request.difficulty, node_l->default_difficulty (request.version), elapsed.value (), bad_peers));
}
}
stop_once (true);

View file

@ -200,6 +200,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
aggregator (config, stats, generator, final_generator, history, ledger, wallets, active),
wallets (wallets_store.init_error (), *this),
backlog{ nano::backlog_population_config (config), store, stats },
websocket{ config.websocket_config, observers, wallets, ledger, io_ctx, logger },
startup_time (std::chrono::steady_clock::now ()),
node_seq (seq)
{
@ -226,13 +227,6 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
hinting.notify ();
};
if (config.websocket_config.enabled)
{
auto endpoint_l (nano::tcp_endpoint (boost::asio::ip::make_address_v6 (config.websocket_config.address), config.websocket_config.port));
websocket_server = std::make_shared<nano::websocket::listener> (config.websocket_config.tls_config, logger, wallets, io_ctx, endpoint_l);
this->websocket_server->run ();
}
wallets.observer = [this] (bool active) {
observers.wallet.notify (active);
};
@ -306,64 +300,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
}
});
}
if (websocket_server)
{
observers.blocks.add ([this] (nano::election_status const & status_a, std::vector<nano::vote_with_weight_info> const & votes_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a, bool is_state_epoch_a) {
debug_assert (status_a.type != nano::election_status_type::ongoing);
if (this->websocket_server->any_subscriber (nano::websocket::topic::confirmation))
{
auto block_a (status_a.winner);
std::string subtype;
if (is_state_send_a)
{
subtype = "send";
}
else if (block_a->type () == nano::block_type::state)
{
if (block_a->link ().is_zero ())
{
subtype = "change";
}
else if (is_state_epoch_a)
{
debug_assert (amount_a == 0 && this->ledger.is_epoch_link (block_a->link ()));
subtype = "epoch";
}
else
{
subtype = "receive";
}
}
this->websocket_server->broadcast_confirmation (block_a, account_a, amount_a, subtype, status_a, votes_a);
}
});
observers.active_started.add ([this] (nano::block_hash const & hash_a) {
if (this->websocket_server->any_subscriber (nano::websocket::topic::started_election))
{
nano::websocket::message_builder builder;
this->websocket_server->broadcast (builder.started_election (hash_a));
}
});
observers.active_stopped.add ([this] (nano::block_hash const & hash_a) {
if (this->websocket_server->any_subscriber (nano::websocket::topic::stopped_election))
{
nano::websocket::message_builder builder;
this->websocket_server->broadcast (builder.stopped_election (hash_a));
}
});
observers.telemetry.add ([this] (nano::telemetry_data const & telemetry_data, nano::endpoint const & endpoint) {
if (this->websocket_server->any_subscriber (nano::websocket::topic::telemetry))
{
nano::websocket::message_builder builder;
this->websocket_server->broadcast (builder.telemetry_received (telemetry_data, endpoint));
}
});
}
// Add block confirmation type stats regardless of http-callback and websocket subscriptions
observers.blocks.add ([this] (nano::election_status const & status_a, std::vector<nano::vote_with_weight_info> const & votes_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a, bool is_state_epoch_a) {
debug_assert (status_a.type != nano::election_status_type::ongoing);
@ -406,17 +343,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
this->gap_cache.vote (vote_a);
}
});
if (websocket_server)
{
observers.vote.add ([this] (std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> const & channel_a, nano::vote_code code_a) {
if (this->websocket_server->any_subscriber (nano::websocket::topic::vote))
{
nano::websocket::message_builder builder;
auto msg (builder.vote_received (vote_a, code_a));
this->websocket_server->broadcast (msg);
}
});
}
// Cancelling local work generation
observers.work_cancel.add ([this] (nano::root const & root_a) {
this->work.cancel (root_a);
@ -771,6 +698,7 @@ void nano::node::start ()
backlog.start ();
hinting.start ();
bootstrap_server.start ();
websocket.start ();
}
void nano::node::stop ()
@ -794,10 +722,7 @@ void nano::node::stop ()
confirmation_height_processor.stop ();
network.stop ();
telemetry->stop ();
if (websocket_server)
{
websocket_server->stop ();
}
websocket.stop ();
bootstrap_server.stop ();
bootstrap_initiator.stop ();
tcp_listener.stop ();

View file

@ -31,6 +31,7 @@
#include <nano/node/vote_cache.hpp>
#include <nano/node/vote_processor.hpp>
#include <nano/node/wallet.hpp>
#include <nano/node/websocket.hpp>
#include <nano/node/write_database_queue.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/secure/utility.hpp>
@ -47,10 +48,6 @@ namespace nano
namespace rocksdb
{
} // Declare a namespace rocksdb inside nano so all references to the rocksdb library need to be globally scoped e.g. ::rocksdb::Slice
namespace websocket
{
class listener;
}
class node;
class telemetry;
class work_pool;
@ -69,6 +66,8 @@ public:
node (boost::asio::io_context &, uint16_t, boost::filesystem::path const &, nano::logging const &, nano::work_pool &, nano::node_flags = nano::node_flags (), unsigned seq = 0);
node (boost::asio::io_context &, boost::filesystem::path const &, nano::node_config const &, nano::work_pool &, nano::node_flags = nano::node_flags (), unsigned seq = 0);
~node ();
public:
template <typename T>
void background (T action_a)
{
@ -144,7 +143,6 @@ public:
nano::stat stats;
nano::thread_pool workers;
nano::thread_pool bootstrap_workers;
std::shared_ptr<nano::websocket::listener> websocket_server;
nano::node_flags flags;
nano::work_pool & work;
nano::distributed_work_factory distributed_work;
@ -186,6 +184,7 @@ public:
nano::request_aggregator aggregator;
nano::wallets wallets;
nano::backlog_population backlog;
nano::websocket_server websocket;
std::chrono::steady_clock::time_point const startup_time;
std::chrono::seconds unchecked_cutoff = std::chrono::seconds (7 * 24 * 60 * 60); // Week

View file

@ -3,6 +3,7 @@
#include <nano/boost/asio/strand.hpp>
#include <nano/lib/tlsconfig.hpp>
#include <nano/lib/work.hpp>
#include <nano/node/node_observers.hpp>
#include <nano/node/transport/transport.hpp>
#include <nano/node/wallet.hpp>
#include <nano/node/websocket.hpp>
@ -961,3 +962,105 @@ std::string nano::websocket::message::to_string () const
ostream.flush ();
return ostream.str ();
}
/*
* websocket_server
*/
nano::websocket_server::websocket_server (nano::websocket::config & config_a, nano::node_observers & observers_a, nano::wallets & wallets_a, nano::ledger & ledger_a, boost::asio::io_context & io_ctx_a, nano::logger_mt & logger_a) :
config{ config_a },
observers{ observers_a },
wallets{ wallets_a },
ledger{ ledger_a },
io_ctx{ io_ctx_a },
logger{ logger_a }
{
if (!config.enabled)
{
return;
}
auto endpoint = nano::tcp_endpoint{ boost::asio::ip::make_address_v6 (config.address), config.port };
server = std::make_shared<nano::websocket::listener> (config.tls_config, logger, wallets, io_ctx, endpoint);
observers.blocks.add ([this] (nano::election_status const & status_a, std::vector<nano::vote_with_weight_info> const & votes_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a, bool is_state_epoch_a) {
debug_assert (status_a.type != nano::election_status_type::ongoing);
if (server->any_subscriber (nano::websocket::topic::confirmation))
{
auto block_a = status_a.winner;
std::string subtype;
if (is_state_send_a)
{
subtype = "send";
}
else if (block_a->type () == nano::block_type::state)
{
if (block_a->link ().is_zero ())
{
subtype = "change";
}
else if (is_state_epoch_a)
{
debug_assert (amount_a == 0 && ledger.is_epoch_link (block_a->link ()));
subtype = "epoch";
}
else
{
subtype = "receive";
}
}
server->broadcast_confirmation (block_a, account_a, amount_a, subtype, status_a, votes_a);
}
});
observers.active_started.add ([this] (nano::block_hash const & hash_a) {
if (server->any_subscriber (nano::websocket::topic::started_election))
{
nano::websocket::message_builder builder;
server->broadcast (builder.started_election (hash_a));
}
});
observers.active_stopped.add ([this] (nano::block_hash const & hash_a) {
if (server->any_subscriber (nano::websocket::topic::stopped_election))
{
nano::websocket::message_builder builder;
server->broadcast (builder.stopped_election (hash_a));
}
});
observers.telemetry.add ([this] (nano::telemetry_data const & telemetry_data, nano::endpoint const & endpoint) {
if (server->any_subscriber (nano::websocket::topic::telemetry))
{
nano::websocket::message_builder builder;
server->broadcast (builder.telemetry_received (telemetry_data, endpoint));
}
});
observers.vote.add ([this] (std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> const & channel_a, nano::vote_code code_a) {
if (server->any_subscriber (nano::websocket::topic::vote))
{
nano::websocket::message_builder builder;
auto msg{ builder.vote_received (vote_a, code_a) };
server->broadcast (msg);
}
});
}
void nano::websocket_server::start ()
{
if (server)
{
server->run ();
}
}
void nano::websocket_server::stop ()
{
if (server)
{
server->stop ();
}
}

View file

@ -6,6 +6,7 @@
#include <nano/node/common.hpp>
#include <nano/node/election.hpp>
#include <nano/node/websocket_stream.hpp>
#include <nano/node/websocketconfig.hpp>
#include <nano/secure/common.hpp>
#include <boost/property_tree/json_parser.hpp>
@ -25,7 +26,9 @@ class vote;
class election_status;
class telemetry_data;
class tls_config;
class node_observers;
enum class election_status_type : uint8_t;
namespace websocket
{
class listener;
@ -353,4 +356,28 @@ namespace websocket
std::atomic<bool> stopped{ false };
};
}
/**
* Wrapper of websocket related functionality that node interacts with
*/
class websocket_server
{
public:
websocket_server (nano::websocket::config &, nano::node_observers &, nano::wallets &, nano::ledger &, boost::asio::io_context &, nano::logger_mt &);
void start ();
void stop ();
private: // Dependencies
nano::websocket::config const & config;
nano::node_observers & observers;
nano::wallets & wallets;
nano::ledger & ledger;
boost::asio::io_context & io_ctx;
nano::logger_mt & logger;
public:
// TODO: Encapsulate, this is public just because existing code needs it
std::shared_ptr<nano::websocket::listener> server;
};
}