Web socket subscription for active difficulty (#2091)

* Add active_difficulty topic

* Topic to string for active_difficulty

* Add difficulty_changed message builder

* Add difficulty observer

* Notify observer if difficulty changes

* Add test for subscribe_active_difficulty

* Update comment

* Run clang format all

* Use boost ptree double get instead of std::stod

* Rename test and for std::launch::async policy

* Remove difficulty_observer and call node.observers.difficulty directly

* Fix formatting issue
This commit is contained in:
Chris Linegar 2019-06-27 14:20:23 +01:00 committed by cryptocode
commit 311f8cb611
6 changed files with 110 additions and 0 deletions

View file

@ -160,6 +160,72 @@ TEST (websocket, subscription_edge)
node1->stop ();
}
// Test client subscribing to changes in active_difficulty
TEST (websocket, active_difficulty)
{
nano::system system (24000, 1);
nano::node_init init1;
nano::node_config config;
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = 24078;
auto node1 (std::make_shared<nano::node> (init1, system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
node1->start ();
system.nodes.push_back (node1);
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::active_difficulty));
// Subscribe to active_difficulty and wait for response asynchronously
ack_ready = false;
auto client_task = ([&node1]() -> boost::optional<std::string> {
auto response = websocket_test_call ("::1", "24078", R"json({"action": "subscribe", "topic": "active_difficulty", "ack": true})json", true, true);
return response;
});
auto client_future = std::async (std::launch::async, client_task);
// Wait for acknowledge
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::active_difficulty));
// Fake history records to force trended_active_difficulty change
node1->active.multipliers_cb.push_front (10.);
// Wait to receive the active_difficulty message
system.deadline_set (5s);
while (client_future.wait_for (std::chrono::seconds (0)) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}
// Check active_difficulty response
auto response = client_future.get ();
ASSERT_TRUE (response);
std::stringstream stream;
stream << response;
boost::property_tree::ptree event;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "active_difficulty");
auto message_contents = event.get_child ("message");
uint64_t network_minimum;
nano::from_string_hex (message_contents.get<std::string> ("network_minimum"), network_minimum);
ASSERT_EQ (network_minimum, node1->network_params.network.publish_threshold);
uint64_t network_current;
nano::from_string_hex (message_contents.get<std::string> ("network_current"), network_current);
ASSERT_EQ (network_current, node1->active.active_difficulty ());
double multiplier = message_contents.get<double> ("multiplier");
ASSERT_NEAR (multiplier, nano::difficulty::to_multiplier (node1->active.active_difficulty (), node1->network_params.network.publish_threshold), 1e-6);
node1->stop ();
}
/** Subscribes to block confirmations, confirms a block and then awaits websocket notification */
TEST (websocket, confirmation)
{

View file

@ -637,7 +637,13 @@ void nano::active_transactions::update_active_difficulty (std::unique_lock<std::
auto sum (std::accumulate (multipliers_cb.begin (), multipliers_cb.end (), double(0)));
auto difficulty = nano::difficulty::from_multiplier (sum / multipliers_cb.size (), node.network_params.network.publish_threshold);
assert (difficulty >= node.network_params.network.publish_threshold);
bool notify_change = trended_active_difficulty != difficulty;
trended_active_difficulty = difficulty;
if (notify_change)
{
node.observers.difficulty.notify (trended_active_difficulty);
}
}
uint64_t nano::active_transactions::active_difficulty ()

View file

@ -351,6 +351,15 @@ startup_time (std::chrono::steady_clock::now ())
this->websocket_server->broadcast (builder.stopped_election (hash_a));
}
});
observers.difficulty.add ([this](uint64_t active_difficulty) {
if (this->websocket_server->any_subscriber (nano::websocket::topic::active_difficulty))
{
nano::websocket::message_builder builder;
auto msg (builder.difficulty_changed (network_params.network.publish_threshold, active_difficulty));
this->websocket_server->broadcast (msg);
}
});
}
// Add block confirmation type stats regardless of http-callback and websocket subscriptions
observers.blocks.add ([this](nano::election_status const & status_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a) {

View file

@ -20,6 +20,7 @@ public:
nano::observer_set<nano::account const &, bool> account_balance;
nano::observer_set<std::shared_ptr<nano::transport::channel>> endpoint;
nano::observer_set<> disconnect;
nano::observer_set<uint64_t> difficulty;
};
std::unique_ptr<seq_con_info_component> collect_seq_con_info (node_observers & node_observers, const std::string & name);

View file

@ -316,6 +316,11 @@ nano::websocket::topic to_topic (std::string topic_a)
{
topic = nano::websocket::topic::ack;
}
else if (topic_a == "active_difficulty")
{
topic = nano::websocket::topic::active_difficulty;
}
return topic;
}
@ -338,6 +343,10 @@ std::string from_topic (nano::websocket::topic topic_a)
{
topic = "ack";
}
else if (topic_a == nano::websocket::topic::active_difficulty)
{
topic = "active_difficulty";
}
return topic;
}
}
@ -616,6 +625,22 @@ nano::websocket::message nano::websocket::message_builder::vote_received (std::s
return message_l;
}
nano::websocket::message nano::websocket::message_builder::difficulty_changed (uint64_t publish_threshold, uint64_t difficulty_active)
{
nano::websocket::message message_l (nano::websocket::topic::active_difficulty);
set_common_fields (message_l);
// Active difficulty information
boost::property_tree::ptree difficulty_l;
difficulty_l.put ("network_minimum", nano::to_string_hex (publish_threshold));
difficulty_l.put ("network_current", nano::to_string_hex (difficulty_active));
auto multiplier = nano::difficulty::to_multiplier (difficulty_active, publish_threshold);
difficulty_l.put ("multiplier", nano::to_string (multiplier));
message_l.contents.add_child ("message", difficulty_l);
return message_l;
}
void nano::websocket::message_builder::set_common_fields (nano::websocket::message & message_a)
{
using namespace std::chrono;

View file

@ -53,6 +53,8 @@ namespace websocket
stopped_election,
/** A vote message **/
vote,
/** An active difficulty message */
active_difficulty,
/** Auxiliary length, not a valid topic, must be the last enum */
_length
};
@ -83,6 +85,7 @@ namespace websocket
message block_confirmed (std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype, bool include_block, nano::election_status_type election_status_type_a);
message stopped_election (nano::block_hash const & hash_a);
message vote_received (std::shared_ptr<nano::vote> vote_a);
message difficulty_changed (uint64_t publish_threshold, uint64_t difficulty_active);
private:
/** Set the common fields for messages: timestamp and topic. */