Websocket bootstrap subscription (#2471)

* Bootstrap attempt ID
* Websocket bootstrap subscription
This commit is contained in:
Sergey Kroshnin 2020-01-16 12:11:14 +03:00 committed by GitHub
commit 796f9f6d88
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 288 additions and 34 deletions

View file

@ -521,7 +521,48 @@ TEST (bootstrap_processor, lazy_hash)
// Start lazy bootstrap with last block in chain known
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
node1->network.udp_channels.insert (system.nodes[0]->network.endpoint (), node1->network_params.protocol.protocol_version);
node1->bootstrap_initiator.bootstrap_lazy (receive2->hash ());
node1->bootstrap_initiator.bootstrap_lazy (receive2->hash (), true);
{
auto attempt (node1->bootstrap_initiator.current_attempt ());
ASSERT_NE (nullptr, attempt);
ASSERT_EQ (receive2->hash ().to_string (), attempt->id);
}
// Check processed blocks
system.deadline_set (10s);
while (node1->balance (key2.pub) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
node1->stop ();
}
TEST (bootstrap_processor, lazy_hash_bootstrap_id)
{
nano::system system (1);
auto node0 (system.nodes[0]);
nano::genesis genesis;
nano::keypair key1;
nano::keypair key2;
// Generating test chain
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, key1.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node0->work_generate_blocking (genesis.hash ())));
auto receive1 (std::make_shared<nano::state_block> (key1.pub, 0, key1.pub, nano::Gxrb_ratio, send1->hash (), key1.prv, key1.pub, *node0->work_generate_blocking (key1.pub)));
auto send2 (std::make_shared<nano::state_block> (key1.pub, receive1->hash (), key1.pub, 0, key2.pub, key1.prv, key1.pub, *node0->work_generate_blocking (receive1->hash ())));
auto receive2 (std::make_shared<nano::state_block> (key2.pub, 0, key2.pub, nano::Gxrb_ratio, send2->hash (), key2.prv, key2.pub, *node0->work_generate_blocking (key2.pub)));
// Processing test chain
node0->block_processor.add (send1);
node0->block_processor.add (receive1);
node0->block_processor.add (send2);
node0->block_processor.add (receive2);
node0->block_processor.flush ();
// Start lazy bootstrap with last block in chain known
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
node1->network.udp_channels.insert (node0->network.endpoint (), node1->network_params.protocol.protocol_version);
node1->bootstrap_initiator.bootstrap_lazy (receive2->hash (), true, true, "123456");
{
auto attempt (node1->bootstrap_initiator.current_attempt ());
ASSERT_NE (nullptr, attempt);
ASSERT_EQ ("123456", attempt->id);
}
// Check processed blocks
system.deadline_set (10s);
while (node1->balance (key2.pub) == 0)
@ -695,6 +736,11 @@ TEST (bootstrap_processor, wallet_lazy_frontier)
ASSERT_NE (nullptr, wallet);
wallet->insert_adhoc (key2.prv);
node1->bootstrap_wallet ();
{
auto attempt (node1->bootstrap_initiator.current_attempt ());
ASSERT_NE (nullptr, attempt);
ASSERT_EQ (key2.pub.to_account (), attempt->id);
}
// Check processed blocks
system.deadline_set (10s);
while (!node1->ledger.block_exists (receive2->hash ()))

View file

@ -805,6 +805,141 @@ TEST (websocket, work)
ASSERT_EQ (contents.get<std::string> ("reason"), "");
}
// Test client subscribing to notifications for bootstrap
TEST (websocket, bootstrap)
{
nano::system system;
nano::node_config config (nano::get_available_port (), system.logging);
config.websocket_config.enabled = true;
config.websocket_config.port = nano::get_available_port ();
auto node1 (system.add_node (config));
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));
// Subscribe to bootstrap and wait for response asynchronously
ack_ready = false;
auto client_task = ([config]() -> boost::optional<std::string> {
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "subscribe", "topic": "bootstrap", "ack": true})json", true, true);
return response;
});
auto client_future = std::async (std::launch::async, client_task);
// Wait for acknowledge
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));
// Start bootsrap attempt
node1->bootstrap_initiator.bootstrap (true, "123abc");
ASSERT_NE (nullptr, node1->bootstrap_initiator.current_attempt ());
// Wait for the bootstrap notification
system.deadline_set (5s);
while (client_future.wait_for (std::chrono::seconds (0)) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}
// Check the bootstrap notification message
auto response = client_future.get ();
ASSERT_TRUE (response);
std::stringstream stream;
stream << response;
boost::property_tree::ptree event;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "bootstrap");
auto & contents = event.get_child ("message");
ASSERT_EQ (contents.get<std::string> ("reason"), "started");
ASSERT_EQ (contents.get<std::string> ("id"), "123abc");
ASSERT_EQ (contents.get<std::string> ("mode"), "legacy");
// Wait for bootstrap finish
system.deadline_set (5s);
while (node1->bootstrap_initiator.in_progress ())
{
ASSERT_NO_ERROR (system.poll ());
}
}
TEST (websocket, bootstrap_excited)
{
nano::system system;
nano::node_config config (nano::get_available_port (), system.logging);
config.websocket_config.enabled = true;
config.websocket_config.port = nano::get_available_port ();
auto node1 (system.add_node (config));
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));
// Start bootstrap, exit after subscription
std::atomic<bool> bootstrap_started{ false };
std::atomic<bool> subscribed{ false };
std::thread bootstrap_thread ([&system, node1, &bootstrap_started, &subscribed](){
node1->bootstrap_initiator.bootstrap (true, "123abc");
auto attempt (node1->bootstrap_initiator.current_attempt ());
ASSERT_NE (nullptr, attempt);
bootstrap_started = true;
system.deadline_set (5s);
while (!subscribed)
{
ASSERT_NO_ERROR (system.poll ());
}
});
// Wait for bootstrap start
system.deadline_set (5s);
while (!bootstrap_started)
{
ASSERT_NO_ERROR (system.poll ());
}
// Subscribe to bootstrap and wait for response asynchronously
ack_ready = false;
auto client_task = ([config]() -> boost::optional<std::string> {
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "subscribe", "topic": "bootstrap", "ack": true})json", true, true);
return response;
});
auto client_future = std::async (std::launch::async, client_task);
// Wait for acknowledge
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));
// Wait for the bootstrap notification
subscribed = true;
bootstrap_thread.join ();
system.deadline_set (5s);
while (client_future.wait_for (std::chrono::seconds (0)) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}
// Check the bootstrap notification message
auto response = client_future.get ();
ASSERT_TRUE (response);
std::stringstream stream;
stream << response;
boost::property_tree::ptree event;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "bootstrap");
auto & contents = event.get_child ("message");
ASSERT_EQ (contents.get<std::string> ("reason"), "exited");
ASSERT_EQ (contents.get<std::string> ("id"), "123abc");
ASSERT_EQ (contents.get<std::string> ("mode"), "legacy");
ASSERT_EQ (contents.get<unsigned> ("total_blocks"), 0);
ASSERT_LT (contents.get<unsigned> ("duration"), 15000);
}
/** Tests clients subscribing multiple times or unsubscribing without a subscription */
TEST (websocket, ws_keepalive)
{

View file

@ -7,6 +7,7 @@
#include <nano/node/node.hpp>
#include <nano/node/transport/tcp.hpp>
#include <nano/node/transport/udp.hpp>
#include <nano/node/websocket.hpp>
#include <boost/format.hpp>
@ -75,19 +76,35 @@ std::shared_ptr<nano::bootstrap_client> nano::bootstrap_client::shared ()
return shared_from_this ();
}
nano::bootstrap_attempt::bootstrap_attempt (std::shared_ptr<nano::node> node_a, nano::bootstrap_mode mode_a) :
nano::bootstrap_attempt::bootstrap_attempt (std::shared_ptr<nano::node> node_a, nano::bootstrap_mode mode_a, std::string id_a) :
next_log (std::chrono::steady_clock::now ()),
node (node_a),
mode (mode_a)
mode (mode_a),
id (id_a)
{
node->logger.always_log ("Starting bootstrap attempt");
if (id.empty ())
{
nano::random_constants constants;
id = constants.random_128.to_string ();
}
node->logger.always_log (boost::str (boost::format ("Starting bootstrap attempt id %1%") % id));
node->bootstrap_initiator.notify_listeners (true);
if (node->websocket_server)
{
nano::websocket::message_builder builder;
node->websocket_server->broadcast (builder.bootstrap_started (id, mode_text ()));
}
}
nano::bootstrap_attempt::~bootstrap_attempt ()
{
node->logger.always_log ("Exiting bootstrap attempt");
node->logger.always_log (boost::str (boost::format ("Exiting bootstrap attempt id %1%") % id));
node->bootstrap_initiator.notify_listeners (false);
if (node->websocket_server)
{
nano::websocket::message_builder builder;
node->websocket_server->broadcast (builder.bootstrap_exited (id, mode_text (), attempt_start, total_blocks));
}
}
bool nano::bootstrap_attempt::should_log ()
@ -794,6 +811,24 @@ bool nano::bootstrap_attempt::confirm_frontiers (nano::unique_lock<std::mutex> &
return confirmed;
}
std::string nano::bootstrap_attempt::mode_text ()
{
std::string mode_text;
if (mode == nano::bootstrap_mode::legacy)
{
mode_text = "legacy";
}
else if (mode == nano::bootstrap_mode::lazy)
{
mode_text = "lazy";
}
else if (mode == nano::bootstrap_mode::wallet_lazy)
{
mode_text = "wallet_lazy";
}
return mode_text;
}
void nano::bootstrap_attempt::lazy_start (nano::hash_or_account const & hash_or_account_a, bool confirmed)
{
nano::lock_guard<std::mutex> lazy_lock (lazy_mutex);
@ -1339,7 +1374,7 @@ nano::bootstrap_initiator::~bootstrap_initiator ()
stop ();
}
void nano::bootstrap_initiator::bootstrap (bool force)
void nano::bootstrap_initiator::bootstrap (bool force, std::string id_a)
{
nano::unique_lock<std::mutex> lock (mutex);
if (force && attempt != nullptr)
@ -1352,12 +1387,12 @@ void nano::bootstrap_initiator::bootstrap (bool force)
if (!stopped && attempt == nullptr)
{
node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::initiate, nano::stat::dir::out);
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared ());
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared (), nano::bootstrap_mode::legacy, id_a);
condition.notify_all ();
}
}
void nano::bootstrap_initiator::bootstrap (nano::endpoint const & endpoint_a, bool add_to_peers, bool frontiers_confirmed)
void nano::bootstrap_initiator::bootstrap (nano::endpoint const & endpoint_a, bool add_to_peers, bool frontiers_confirmed, std::string id_a)
{
if (add_to_peers)
{
@ -1374,7 +1409,7 @@ void nano::bootstrap_initiator::bootstrap (nano::endpoint const & endpoint_a, bo
// clang-format on
}
node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::initiate, nano::stat::dir::out);
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared ());
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared (), nano::bootstrap_mode::legacy, id_a);
if (frontiers_confirmed)
{
excluded_peers.remove (nano::transport::map_endpoint_to_tcp (endpoint_a));
@ -1388,7 +1423,7 @@ void nano::bootstrap_initiator::bootstrap (nano::endpoint const & endpoint_a, bo
}
}
void nano::bootstrap_initiator::bootstrap_lazy (nano::hash_or_account const & hash_or_account_a, bool force, bool confirmed)
void nano::bootstrap_initiator::bootstrap_lazy (nano::hash_or_account const & hash_or_account_a, bool force, bool confirmed, std::string id_a)
{
{
nano::unique_lock<std::mutex> lock (mutex);
@ -1402,7 +1437,7 @@ void nano::bootstrap_initiator::bootstrap_lazy (nano::hash_or_account const & ha
node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::initiate_lazy, nano::stat::dir::out);
if (attempt == nullptr)
{
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared (), nano::bootstrap_mode::lazy);
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared (), nano::bootstrap_mode::lazy, id_a.empty () ? hash_or_account_a.to_string () : id_a);
}
attempt->lazy_start (hash_or_account_a, confirmed);
}
@ -1416,7 +1451,8 @@ void nano::bootstrap_initiator::bootstrap_wallet (std::deque<nano::account> & ac
node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::initiate_wallet_lazy, nano::stat::dir::out);
if (attempt == nullptr)
{
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared (), nano::bootstrap_mode::wallet_lazy);
std::string id (!accounts_a.empty () ? accounts_a[0].to_account () : "");
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared (), nano::bootstrap_mode::wallet_lazy, id);
}
attempt->wallet_start (accounts_a);
}

View file

@ -58,7 +58,7 @@ class bulk_push_client;
class bootstrap_attempt final : public std::enable_shared_from_this<bootstrap_attempt>
{
public:
explicit bootstrap_attempt (std::shared_ptr<nano::node> node_a, nano::bootstrap_mode mode_a = nano::bootstrap_mode::legacy);
explicit bootstrap_attempt (std::shared_ptr<nano::node> node_a, nano::bootstrap_mode mode_a = nano::bootstrap_mode::legacy, std::string id_a = "");
~bootstrap_attempt ();
void run ();
std::shared_ptr<nano::bootstrap_client> connection (nano::unique_lock<std::mutex> &, bool = false);
@ -82,6 +82,7 @@ public:
void attempt_restart_check (nano::unique_lock<std::mutex> &);
bool confirm_frontiers (nano::unique_lock<std::mutex> &);
bool process_block (std::shared_ptr<nano::block>, nano::account const &, uint64_t, nano::bulk_pull::count_t, bool, unsigned);
std::string mode_text ();
/** Lazy bootstrap */
void lazy_run ();
void lazy_start (nano::hash_or_account const &, bool confirmed = true);
@ -130,6 +131,7 @@ public:
std::atomic<bool> stopped{ false };
std::chrono::steady_clock::time_point attempt_start{ std::chrono::steady_clock::now () };
nano::bootstrap_mode mode;
std::string id;
std::mutex mutex;
nano::condition_variable condition;
// Lazy bootstrap
@ -248,9 +250,9 @@ class bootstrap_initiator final
public:
explicit bootstrap_initiator (nano::node &);
~bootstrap_initiator ();
void bootstrap (nano::endpoint const &, bool add_to_peers = true, bool frontiers_confirmed = false);
void bootstrap (bool force = false);
void bootstrap_lazy (nano::hash_or_account const &, bool force = false, bool confirmed = true);
void bootstrap (nano::endpoint const &, bool add_to_peers = true, bool frontiers_confirmed = false, std::string id_a = "");
void bootstrap (bool force = false, std::string id_a = "");
void bootstrap_lazy (nano::hash_or_account const &, bool force = false, bool confirmed = true, std::string id_a = "");
void bootstrap_wallet (std::deque<nano::account> &);
void run_bootstrap ();
void notify_listeners (bool);

View file

@ -226,7 +226,7 @@ nano::account_info nano::json_handler::account_info_impl (nano::transaction cons
if (node.store.account_get (transaction_a, account_a, result))
{
ec = nano::error_common::account_not_found;
node.bootstrap_initiator.bootstrap_lazy (account_a, false, false);
node.bootstrap_initiator.bootstrap_lazy (account_a, false, false, account_a.to_account ());
}
}
return result;
@ -1598,7 +1598,8 @@ void nano::json_handler::bootstrap ()
{
if (!node.flags.disable_legacy_bootstrap)
{
node.bootstrap_initiator.bootstrap (nano::endpoint (address, port), true, bypass_frontier_confirmation);
std::string bootstrap_id (request.get<std::string> ("id", ""));
node.bootstrap_initiator.bootstrap (nano::endpoint (address, port), true, bypass_frontier_confirmation, bootstrap_id);
response_l.put ("success", "");
}
else
@ -1623,7 +1624,8 @@ void nano::json_handler::bootstrap_any ()
const bool force = request.get<bool> ("force", false);
if (!node.flags.disable_legacy_bootstrap)
{
node.bootstrap_initiator.bootstrap (force);
std::string bootstrap_id (request.get<std::string> ("id", ""));
node.bootstrap_initiator.bootstrap (force, bootstrap_id);
response_l.put ("success", "");
}
else
@ -1641,7 +1643,8 @@ void nano::json_handler::bootstrap_lazy ()
{
if (!node.flags.disable_lazy_bootstrap)
{
node.bootstrap_initiator.bootstrap_lazy (hash, force);
std::string bootstrap_id (request.get<std::string> ("id", ""));
node.bootstrap_initiator.bootstrap_lazy (hash, force, true, bootstrap_id);
response_l.put ("started", "1");
}
else
@ -1662,6 +1665,7 @@ void nano::json_handler::bootstrap_status ()
{
nano::lock_guard<std::mutex> lock (attempt->mutex);
nano::lock_guard<std::mutex> lazy_lock (attempt->lazy_mutex);
response_l.put ("id", attempt->id);
response_l.put ("clients", std::to_string (attempt->clients.size ()));
response_l.put ("pulls", std::to_string (attempt->pulls.size ()));
response_l.put ("pulling", std::to_string (attempt->pulling));
@ -1673,20 +1677,7 @@ void nano::json_handler::bootstrap_status ()
response_l.put ("requeued_pulls", std::to_string (attempt->requeued_pulls));
response_l.put ("frontiers_received", static_cast<bool> (attempt->frontiers_received));
response_l.put ("frontiers_confirmed", static_cast<bool> (attempt->frontiers_confirmed));
std::string mode_text;
if (attempt->mode == nano::bootstrap_mode::legacy)
{
mode_text = "legacy";
}
else if (attempt->mode == nano::bootstrap_mode::lazy)
{
mode_text = "lazy";
}
else if (attempt->mode == nano::bootstrap_mode::wallet_lazy)
{
mode_text = "wallet_lazy";
}
response_l.put ("mode", mode_text);
response_l.put ("mode", attempt->mode_text ());
response_l.put ("lazy_blocks", std::to_string (attempt->lazy_blocks.size ()));
response_l.put ("lazy_state_backlog", std::to_string (attempt->lazy_state_backlog.size ()));
response_l.put ("lazy_balances", std::to_string (attempt->lazy_balances.size ()));

View file

@ -341,6 +341,10 @@ nano::websocket::topic to_topic (std::string const & topic_a)
{
topic = nano::websocket::topic::work;
}
else if (topic_a == "bootstrap")
{
topic = nano::websocket::topic::bootstrap;
}
return topic;
}
@ -372,6 +376,10 @@ std::string from_topic (nano::websocket::topic topic_a)
{
topic = "work";
}
else if (topic_a == nano::websocket::topic::bootstrap)
{
topic = "bootstrap";
}
return topic;
}
}
@ -763,6 +771,38 @@ nano::websocket::message nano::websocket::message_builder::work_failed (nano::bl
return work_generation (root_a, 0, difficulty_a, publish_threshold_a, duration_a, "", bad_peers_a, false, false);
}
nano::websocket::message nano::websocket::message_builder::bootstrap_started (std::string const & id_a, std::string const & mode_a)
{
nano::websocket::message message_l (nano::websocket::topic::bootstrap);
set_common_fields (message_l);
// Bootstrap information
boost::property_tree::ptree bootstrap_l;
bootstrap_l.put ("reason", "started");
bootstrap_l.put ("id", id_a);
bootstrap_l.put ("mode", mode_a);
message_l.contents.add_child ("message", bootstrap_l);
return message_l;
}
nano::websocket::message nano::websocket::message_builder::bootstrap_exited (std::string const & id_a, std::string const & mode_a, std::chrono::steady_clock::time_point const start_time_a, uint64_t const total_blocks_a)
{
nano::websocket::message message_l (nano::websocket::topic::bootstrap);
set_common_fields (message_l);
// Bootstrap information
boost::property_tree::ptree bootstrap_l;
bootstrap_l.put ("reason", "exited");
bootstrap_l.put ("id", id_a);
bootstrap_l.put ("mode", mode_a);
bootstrap_l.put ("total_blocks", total_blocks_a);
bootstrap_l.put ("duration", std::chrono::duration_cast<std::chrono::seconds> (std::chrono::steady_clock::now () - start_time_a).count ());
message_l.contents.add_child ("message", bootstrap_l);
return message_l;
}
void nano::websocket::message_builder::set_common_fields (nano::websocket::message & message_a)
{
using namespace std::chrono;

View file

@ -52,6 +52,8 @@ namespace websocket
active_difficulty,
/** Work generation message */
work,
/** A bootstrap message */
bootstrap,
/** Auxiliary length, not a valid topic, must be the last enum */
_length
};
@ -86,6 +88,8 @@ namespace websocket
message work_generation (nano::block_hash const & root_a, uint64_t const work_a, uint64_t const difficulty_a, uint64_t const publish_threshold_a, std::chrono::milliseconds const & duration_a, std::string const & peer_a, std::vector<std::string> const & bad_peers_a, bool const completed_a = true, bool const cancelled_a = false);
message work_cancelled (nano::block_hash const & root_a, uint64_t const difficulty_a, uint64_t const publish_threshold_a, std::chrono::milliseconds const & duration_a, std::vector<std::string> const & bad_peers_a);
message work_failed (nano::block_hash const & root_a, uint64_t const difficulty_a, uint64_t const publish_threshold_a, std::chrono::milliseconds const & duration_a, std::vector<std::string> const & bad_peers_a);
message bootstrap_started (std::string const & id_a, std::string const & mode_a);
message bootstrap_exited (std::string const & id_a, std::string const & mode_a, std::chrono::steady_clock::time_point const start_time_a, uint64_t const total_blocks_a);
private:
/** Set the common fields for messages: timestamp and topic. */