Indeterminate vote status and enhanced websocket vote sub (#2444)

* Indeterminate vote status and enhanced websocket vote sub

* unhandled stats switch case

* disambiguate conditional expression

* Return vote_code directly from active.vote and replace tribool with simpler approach
This commit is contained in:
Guilherme Lawless 2020-01-09 11:25:57 +01:00 committed by GitHub
commit d511b1fed6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 251 additions and 146 deletions

View file

@ -681,3 +681,49 @@ TEST (active_transactions, restart_dropped)
ASSERT_EQ (work2, block->block_work ());
}
}
TEST (active_transactions, vote_replays)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.enable_voting = false;
auto & node = *system.add_node (node_config);
nano::genesis genesis;
nano::keypair key;
std::error_code ec;
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ())));
ASSERT_NE (nullptr, send1);
auto open1 (std::make_shared<nano::state_block> (key.pub, 0, key.pub, nano::Gxrb_ratio, send1->hash (), key.prv, key.pub, *system.work.generate (key.pub)));
ASSERT_NE (nullptr, open1);
node.process_active (send1);
node.process_active (open1);
node.block_processor.flush ();
ASSERT_EQ (2, node.active.size ());
// First vote is not a replay and confirms the election, second vote should be indeterminate since the election no longer exists
auto vote_send1 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, send1));
ASSERT_EQ (nano::vote_code::vote, node.active.vote (vote_send1));
ASSERT_EQ (1, node.active.size ());
ASSERT_EQ (nano::vote_code::indeterminate, node.active.vote (vote_send1));
// Open new account
auto vote_open1 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, open1));
ASSERT_EQ (nano::vote_code::vote, node.active.vote (vote_open1));
ASSERT_TRUE (node.active.empty ());
ASSERT_EQ (nano::vote_code::indeterminate, node.active.vote (vote_open1));
ASSERT_EQ (nano::Gxrb_ratio, node.ledger.weight (key.pub));
auto send2 (std::make_shared<nano::state_block> (key.pub, open1->hash (), key.pub, nano::Gxrb_ratio - 1, key.pub, key.prv, key.pub, *system.work.generate (open1->hash ())));
ASSERT_NE (nullptr, send2);
node.process_active (send2);
node.block_processor.flush ();
ASSERT_EQ (1, node.active.size ());
auto vote1_send2 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, send2));
auto vote2_send2 (std::make_shared<nano::vote> (key.pub, key.prv, 0, send2));
ASSERT_EQ (nano::vote_code::vote, node.active.vote (vote2_send2));
ASSERT_EQ (1, node.active.size ());
ASSERT_EQ (nano::vote_code::replay, node.active.vote (vote2_send2));
ASSERT_EQ (1, node.active.size ());
ASSERT_EQ (nano::vote_code::vote, node.active.vote (vote1_send2));
ASSERT_EQ (0, node.active.size ());
ASSERT_EQ (nano::vote_code::indeterminate, node.active.vote (vote1_send2));
ASSERT_EQ (nano::vote_code::indeterminate, node.active.vote (vote2_send2));
}

View file

@ -760,9 +760,9 @@ TEST (votes, add_one)
ASSERT_EQ (1, votes1->last_votes.size ());
lock.unlock ();
auto vote1 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 1, send1));
ASSERT_FALSE (node1.active.vote (vote1));
ASSERT_EQ (nano::vote_code::vote, node1.active.vote (vote1));
auto vote2 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 2, send1));
ASSERT_FALSE (node1.active.vote (vote2));
ASSERT_EQ (nano::vote_code::indeterminate, node1.active.vote (vote2));
lock.lock ();
ASSERT_EQ (2, votes1->last_votes.size ());
auto existing1 (votes1->last_votes.find (nano::test_genesis_key.pub));
@ -790,9 +790,9 @@ TEST (votes, add_two)
nano::keypair key2;
auto send2 (std::make_shared<nano::send_block> (genesis.hash (), key2.pub, 0, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
auto vote2 (std::make_shared<nano::vote> (key2.pub, key2.prv, 1, send2));
ASSERT_FALSE (node1.active.vote (vote2));
ASSERT_EQ (nano::vote_code::vote, node1.active.vote (vote2));
auto vote1 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 1, send1));
ASSERT_FALSE (node1.active.vote (vote1));
ASSERT_EQ (nano::vote_code::vote, node1.active.vote (vote1));
lock.lock ();
ASSERT_EQ (3, votes1->last_votes.size ());
ASSERT_NE (votes1->last_votes.end (), votes1->last_votes.find (nano::test_genesis_key.pub));
@ -821,7 +821,7 @@ TEST (votes, add_existing)
}
node1.active.start (send1);
auto vote1 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 1, send1));
ASSERT_FALSE (node1.active.vote (vote1));
ASSERT_EQ (nano::vote_code::vote, node1.active.vote (vote1));
// Block is already processed from vote
ASSERT_TRUE (node1.active.publish (send1));
nano::unique_lock<std::mutex> lock (node1.active.mutex);
@ -834,14 +834,14 @@ TEST (votes, add_existing)
// Pretend we've waited the timeout
votes1->last_votes[nano::test_genesis_key.pub].time = std::chrono::steady_clock::now () - std::chrono::seconds (20);
lock.unlock ();
ASSERT_FALSE (node1.active.vote (vote2));
ASSERT_EQ (nano::vote_code::vote, node1.active.vote (vote2));
ASSERT_FALSE (node1.active.publish (send2));
lock.lock ();
ASSERT_EQ (2, votes1->last_votes[nano::test_genesis_key.pub].sequence);
// Also resend the old vote, and see if we respect the sequence number
votes1->last_votes[nano::test_genesis_key.pub].time = std::chrono::steady_clock::now () - std::chrono::seconds (20);
lock.unlock ();
ASSERT_TRUE (node1.active.vote (vote1));
ASSERT_EQ (nano::vote_code::replay, node1.active.vote (vote1));
lock.lock ();
ASSERT_EQ (2, votes1->last_votes[nano::test_genesis_key.pub].sequence);
ASSERT_EQ (2, votes1->last_votes.size ());

View file

@ -2535,7 +2535,7 @@ TEST (node, vote_by_hash_bundle)
nano::keypair key1;
system.wallet (0)->insert_adhoc (key1.prv);
system.nodes[0]->observers.vote.add ([&max_hashes](std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> channel_a) {
system.nodes[0]->observers.vote.add ([&max_hashes](std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel>, nano::vote_code) {
if (vote_a->blocks.size () > max_hashes)
{
max_hashes = vote_a->blocks.size ();

View file

@ -80,15 +80,11 @@ boost::optional<std::string> websocket_test_call (std::string host, std::string
/** Tests clients subscribing multiple times or unsubscribing without a subscription */
TEST (websocket, subscription_edge)
{
nano::system system (1);
nano::system system;
nano::node_config config (nano::get_available_port (), system.logging);
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = nano::get_available_port ();
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
node1->start ();
system.nodes.push_back (node1);
auto node1 (system.add_node (config));
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation));
@ -158,15 +154,11 @@ TEST (websocket, subscription_edge)
// Test client subscribing to changes in active_difficulty
TEST (websocket, active_difficulty)
{
nano::system system (1);
nano::system system;
nano::node_config config (nano::get_available_port (), system.logging);
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = nano::get_available_port ();
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
node1->start ();
system.nodes.push_back (node1);
auto node1 (system.add_node (config));
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::active_difficulty));
@ -226,16 +218,11 @@ TEST (websocket, active_difficulty)
/** Subscribes to block confirmations, confirms a block and then awaits websocket notification */
TEST (websocket, confirmation)
{
nano::system system (1);
nano::system system;
nano::node_config config (nano::get_available_port (), system.logging);
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = nano::get_available_port ();
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
node1->wallets.create (nano::random_wallet_id ());
node1->start ();
system.nodes.push_back (node1);
auto node1 (system.add_node (config));
// Start websocket test-client in a separate thread
ack_ready = false;
@ -266,7 +253,7 @@ TEST (websocket, confirmation)
ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::confirmation));
nano::keypair key;
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
auto balance = nano::genesis_amount;
auto send_amount = node1->config.online_weight_minimum.number () + 1;
// Quick-confirm a block, legacy blocks should work without filtering
@ -334,16 +321,11 @@ TEST (websocket, confirmation)
/** Tests getting notification of an erased election */
TEST (websocket, stopped_election)
{
nano::system system (1);
nano::system system;
nano::node_config config (nano::get_available_port (), system.logging);
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = nano::get_available_port ();
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
node1->wallets.create (nano::random_wallet_id ());
node1->start ();
system.nodes.push_back (node1);
auto node1 (system.add_node (config));
// Start websocket test-client in a separate thread
ack_ready = false;
@ -394,16 +376,11 @@ TEST (websocket, stopped_election)
/** Tests the filtering options of block confirmations */
TEST (websocket, confirmation_options)
{
nano::system system (1);
nano::system system;
nano::node_config config (nano::get_available_port (), system.logging);
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = nano::get_available_port ();
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
node1->wallets.create (nano::random_wallet_id ());
node1->start ();
system.nodes.push_back (node1);
auto node1 (system.add_node (config));
// Start websocket test-client in a separate thread
ack_ready = false;
@ -427,7 +404,7 @@ TEST (websocket, confirmation_options)
ack_ready = false;
// Confirm a state block for an in-wallet account
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
nano::keypair key;
auto balance = nano::genesis_amount;
auto send_amount = node1->config.online_weight_minimum.number () + 1;
@ -545,16 +522,11 @@ TEST (websocket, confirmation_options)
/** Subscribes to votes, sends a block and awaits websocket notification of a vote arrival */
TEST (websocket, vote)
{
nano::system system (1);
nano::system system;
nano::node_config config (nano::get_available_port (), system.logging);
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = nano::get_available_port ();
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
node1->wallets.create (nano::random_wallet_id ());
node1->start ();
system.nodes.push_back (node1);
auto node1 (system.add_node (config));
// Start websocket test-client in a separate thread
ack_ready = false;
@ -587,7 +559,7 @@ TEST (websocket, vote)
// Quick-confirm a block
nano::keypair key;
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
nano::block_hash previous (node1->latest (nano::test_genesis_key.pub));
auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, nano::genesis_amount - (node1->config.online_weight_minimum.number () + 1), key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (previous)));
node1->process_active (send);
@ -603,19 +575,78 @@ TEST (websocket, vote)
node1->stop ();
}
/** Tests vote subscription options */
TEST (websocket, vote_options)
/** Tests vote subscription options - vote type */
TEST (websocket, vote_options_type)
{
nano::system system (1);
nano::system system;
nano::node_config config (nano::get_available_port (), system.logging);
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = nano::get_available_port ();
auto node1 (system.add_node (config));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
node1->wallets.create (nano::random_wallet_id ());
node1->start ();
system.nodes.push_back (node1);
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::vote));
// Subscribe to votes and wait for response asynchronously
ack_ready = false;
std::atomic<bool> replay_received{ false };
std::thread client_thread ([&replay_received, config]() {
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port),
R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"include_replays": "true", "include_indeterminate": "false"}})json", true, true);
ASSERT_TRUE (response);
boost::property_tree::ptree event;
std::stringstream stream;
stream << response;
boost::property_tree::read_json (stream, event);
auto message_contents = event.get_child ("message");
ASSERT_EQ (1, message_contents.count ("type"));
ASSERT_EQ ("replay", message_contents.get<std::string> ("type"));
replay_received = true;
});
// Wait for acknowledge
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::vote));
// Custom made votes for simplicity
nano::genesis genesis;
auto vote (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, genesis.open));
// Indeterminates are not included
{
nano::websocket::message_builder builder;
auto msg (builder.vote_received (vote, nano::vote_code::indeterminate));
node1->websocket_server->broadcast (msg);
}
// Replays are included
{
nano::websocket::message_builder builder;
auto msg (builder.vote_received (vote, nano::vote_code::replay));
node1->websocket_server->broadcast (msg);
}
// Wait for the websocket client
system.deadline_set (5s);
while (!replay_received)
{
ASSERT_NO_ERROR (system.poll ());
}
client_thread.join ();
node1->stop ();
}
/** Tests vote subscription options - list of representatives */
TEST (websocket, vote_options_representatives)
{
nano::system system;
nano::node_config config (nano::get_available_port (), system.logging);
config.websocket_config.enabled = true;
config.websocket_config.port = nano::get_available_port ();
auto node1 (system.add_node (config));
// Start websocket test-client in a separate thread
ack_ready = false;
@ -650,7 +681,7 @@ TEST (websocket, vote_options)
// Quick-confirm a block
nano::keypair key;
auto balance = nano::genesis_amount;
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
auto send_amount = node1->config.online_weight_minimum.number () + 1;
auto confirm_block = [&]() {
nano::block_hash previous (node1->latest (nano::test_genesis_key.pub));
@ -670,10 +701,10 @@ TEST (websocket, vote_options)
std::atomic<bool> client_thread_2_finished{ false };
std::thread client_thread_2 ([&client_thread_2_finished, config]() {
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port),
R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"representatives": ["xrb_invalid"]}})json", true, true, 1s);
R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"representatives": ["xrb_invalid"]}})json", true, true);
// No response expected given the filter
ASSERT_FALSE (response);
// A list of invalid representatives is the same as no filter
ASSERT_TRUE (response);
client_thread_2_finished = true;
});
@ -690,7 +721,6 @@ TEST (websocket, vote_options)
// Confirm another block
confirm_block ();
// No response expected
system.deadline_set (5s);
while (!client_thread_2_finished)
{
@ -705,15 +735,11 @@ TEST (websocket, vote_options)
// Test client subscribing to notifications for work generation
TEST (websocket, work)
{
nano::system system (1);
nano::system system;
nano::node_config config (nano::get_available_port (), system.logging);
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = nano::get_available_port ();
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
node1->start ();
system.nodes.push_back (node1);
auto node1 (system.add_node (config));
ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::work));
@ -782,15 +808,12 @@ TEST (websocket, work)
/** Tests clients subscribing multiple times or unsubscribing without a subscription */
TEST (websocket, ws_keepalive)
{
nano::system system (1);
nano::system system;
nano::node_config config (nano::get_available_port (), system.logging);
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = nano::get_available_port ();
auto node1 (system.add_node (config));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
node1->start ();
system.nodes.push_back (node1);
ack_ready = false;
std::thread subscription_thread ([config]() {
websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "ping"})json", true, false);

View file

@ -564,6 +564,9 @@ std::string nano::stat::detail_to_string (uint32_t key)
case nano::stat::detail::vote_replay:
res = "vote_replay";
break;
case nano::stat::detail::vote_indeterminate:
res = "vote_indeterminate";
break;
case nano::stat::detail::vote_invalid:
res = "vote_invalid";
break;

View file

@ -255,6 +255,7 @@ public:
// vote specific
vote_valid,
vote_replay,
vote_indeterminate,
vote_invalid,
vote_overflow,

View file

@ -530,9 +530,11 @@ bool nano::active_transactions::add (std::shared_ptr<nano::block> block_a, bool
}
// Validate a vote and apply it to the current election if one exists
bool nano::active_transactions::vote (std::shared_ptr<nano::vote> vote_a, bool single_lock)
nano::vote_code nano::active_transactions::vote (std::shared_ptr<nano::vote> vote_a, bool single_lock)
{
std::shared_ptr<nano::election> election;
// If none of the hashes are active, it is unknown whether it's a replay
// In this case, votes are also not republished
bool at_least_one (false);
bool replay (false);
bool processed (false);
{
@ -550,9 +552,10 @@ bool nano::active_transactions::vote (std::shared_ptr<nano::vote> vote_a, bool s
auto existing (blocks.find (block_hash));
if (existing != blocks.end ())
{
at_least_one = true;
result = existing->second->vote (vote_a->account, vote_a->sequence, block_hash);
}
else
else // possibly a vote for a recently confirmed election
{
add_inactive_votes_cache (block_hash, vote_a->account);
}
@ -563,6 +566,7 @@ bool nano::active_transactions::vote (std::shared_ptr<nano::vote> vote_a, bool s
auto existing (roots.get<tag_root> ().find (block->qualified_root ()));
if (existing != roots.get<tag_root> ().end ())
{
at_least_one = true;
result = existing->election->vote (vote_a->account, vote_a->sequence, block->hash ());
}
else
@ -570,15 +574,22 @@ bool nano::active_transactions::vote (std::shared_ptr<nano::vote> vote_a, bool s
add_inactive_votes_cache (block->hash (), vote_a->account);
}
}
replay = replay || result.replay;
processed = processed || result.processed;
replay = replay || result.replay;
}
}
if (processed)
if (at_least_one)
{
node.network.flood_vote (vote_a);
if (processed)
{
node.network.flood_vote (vote_a);
}
return replay ? nano::vote_code::replay : nano::vote_code::vote;
}
else
{
return nano::vote_code::indeterminate;
}
return replay;
}
bool nano::active_transactions::active (nano::qualified_root const & root_a)

View file

@ -79,9 +79,8 @@ public:
// clang-format off
bool start (std::shared_ptr<nano::block>, bool const = false, std::function<void(std::shared_ptr<nano::block>)> const & = [](std::shared_ptr<nano::block>) {});
// clang-format on
// If this returns true, the vote is a replay
// If this returns false, the vote may or may not be a replay
bool vote (std::shared_ptr<nano::vote>, bool = false);
// Distinguishes replay votes, cannot be determined if the block is not in any election
nano::vote_code vote (std::shared_ptr<nano::vote>, bool = false);
// Is the root of this block in the roots container
bool active (nano::block const &);
bool active (nano::qualified_root const &);

View file

@ -303,38 +303,41 @@ startup_time (std::chrono::steady_clock::now ())
this->network.send_keepalive_self (channel_a);
}
});
observers.vote.add ([this](std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> channel_a) {
this->gap_cache.vote (vote_a);
this->online_reps.observe (vote_a->account);
nano::uint128_t rep_weight;
observers.vote.add ([this](std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> channel_a, nano::vote_code code_a) {
if (code_a == nano::vote_code::vote || code_a == nano::vote_code::indeterminate)
{
rep_weight = ledger.weight (vote_a->account);
}
if (rep_weight > minimum_principal_weight ())
{
bool rep_crawler_exists (false);
for (auto hash : *vote_a)
this->gap_cache.vote (vote_a);
this->online_reps.observe (vote_a->account);
nano::uint128_t rep_weight;
{
if (this->rep_crawler.exists (hash))
{
rep_crawler_exists = true;
break;
}
rep_weight = ledger.weight (vote_a->account);
}
if (rep_crawler_exists)
if (rep_weight > minimum_principal_weight ())
{
// We see a valid non-replay vote for a block we requested, this node is probably a representative
if (this->rep_crawler.response (channel_a, vote_a->account, rep_weight))
bool rep_crawler_exists (false);
for (auto hash : *vote_a)
{
logger.try_log (boost::str (boost::format ("Found a representative at %1%") % channel_a->to_string ()));
// Rebroadcasting all active votes to new representative
auto blocks (this->active.list_blocks (true));
for (auto i (blocks.begin ()), n (blocks.end ()); i != n; ++i)
if (this->rep_crawler.exists (hash))
{
if (*i != nullptr)
rep_crawler_exists = true;
break;
}
}
if (rep_crawler_exists)
{
// We see a valid non-replay vote for a block we requested, this node is probably a representative
if (this->rep_crawler.response (channel_a, vote_a->account, rep_weight))
{
logger.try_log (boost::str (boost::format ("Found a representative at %1%") % channel_a->to_string ()));
// Rebroadcasting all active votes to new representative
auto blocks (this->active.list_blocks (true));
for (auto i (blocks.begin ()), n (blocks.end ()); i != n; ++i)
{
nano::confirm_req req (*i);
channel_a->send (req);
if (*i != nullptr)
{
nano::confirm_req req (*i);
channel_a->send (req);
}
}
}
}
@ -343,11 +346,11 @@ startup_time (std::chrono::steady_clock::now ())
});
if (websocket_server)
{
observers.vote.add ([this](std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> channel_a) {
observers.vote.add ([this](std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> channel_a, nano::vote_code code_a) {
if (this->websocket_server->any_subscriber (nano::websocket::topic::vote))
{
nano::websocket::message_builder builder;
auto msg (builder.vote_received (vote_a));
auto msg (builder.vote_received (vote_a, code_a));
this->websocket_server->broadcast (msg);
}
});

View file

@ -13,7 +13,7 @@ public:
using blocks_t = nano::observer_set<nano::election_status const &, nano::account const &, nano::uint128_t const &, bool>;
blocks_t blocks;
nano::observer_set<bool> wallet;
nano::observer_set<std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>> vote;
nano::observer_set<std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>, nano::vote_code> vote;
nano::observer_set<nano::block_hash const &> active_stopped;
nano::observer_set<nano::account const &, bool> account_balance;
nano::observer_set<std::shared_ptr<nano::transport::channel>> endpoint;

View file

@ -202,29 +202,16 @@ nano::vote_code nano::vote_processor::vote_blocking (nano::transaction const & t
auto result (nano::vote_code::invalid);
if (validated || !vote_a->validate ())
{
result = active.vote (vote_a, true);
observers.vote.notify (vote_a, channel_a, result);
// This tries to assist rep nodes that have lost track of their highest sequence number by replaying our highest known vote back to them
// Only do this if the sequence number is significantly different to account for network reordering
// Amplify attack considerations: We're sending out a confirm_ack in response to a confirm_ack for no net traffic increase
auto max_vote (store.vote_max (transaction_a, vote_a));
result = nano::vote_code::replay;
if (!active.vote (vote_a, true))
if (max_vote->sequence > vote_a->sequence + 10000)
{
result = nano::vote_code::vote;
}
switch (result)
{
case nano::vote_code::vote:
observers.vote.notify (vote_a, channel_a);
case nano::vote_code::replay:
// This tries to assist rep nodes that have lost track of their highest sequence number by replaying our highest known vote back to them
// Only do this if the sequence number is significantly different to account for network reordering
// Amplify attack considerations: We're sending out a confirm_ack in response to a confirm_ack for no net traffic increase
if (max_vote->sequence > vote_a->sequence + 10000)
{
nano::confirm_ack confirm (max_vote);
channel_a->send (confirm); // this is non essential traffic as it will be resolicited if not received
}
break;
case nano::vote_code::invalid:
assert (false);
break;
nano::confirm_ack confirm (max_vote);
channel_a->send (confirm); // this is non essential traffic as it will be resolicited if not received
}
}
std::string status;
@ -242,6 +229,10 @@ nano::vote_code nano::vote_processor::vote_blocking (nano::transaction const & t
status = "Vote";
stats.inc (nano::stat::type::vote, nano::stat::detail::vote_valid);
break;
case nano::vote_code::indeterminate:
status = "Indeterminate";
stats.inc (nano::stat::type::vote, nano::stat::detail::vote_indeterminate);
break;
}
if (config.logging.vote_logging ())
{

View file

@ -138,6 +138,8 @@ bool nano::websocket::confirmation_options::should_filter (nano::websocket::mess
nano::websocket::vote_options::vote_options (boost::property_tree::ptree const & options_a, nano::logger_mt & logger_a)
{
include_replays = options_a.get<bool> ("include_replays", false);
include_indeterminate = options_a.get<bool> ("include_indeterminate", false);
auto representatives_l (options_a.get_child_optional ("representatives"));
if (representatives_l)
{
@ -154,21 +156,25 @@ nano::websocket::vote_options::vote_options (boost::property_tree::ptree const &
logger_a.always_log ("Websocket: invalid account given to filter votes: ", representative_l.second.data ());
}
}
}
// Warn the user if the options resulted in an empty filter
if (representatives.empty ())
{
logger_a.always_log ("Websocket: provided options resulted in an empty vote filter");
// Warn the user if the option will be ignored
if (representatives.empty ())
{
logger_a.always_log ("Websocket: account filter for votes is empty, no messages will be filtered");
}
}
}
bool nano::websocket::vote_options::should_filter (nano::websocket::message const & message_a) const
{
bool should_filter_l (true);
auto representative_text_l (message_a.contents.get<std::string> ("message.account"));
if (representatives.find (representative_text_l) != representatives.end ())
auto type (message_a.contents.get<std::string> ("message.type"));
bool should_filter_l = (!include_replays && type == "replay") || (!include_indeterminate && type == "indeterminate");
if (!should_filter_l && !representatives.empty ())
{
should_filter_l = false;
auto representative_text_l (message_a.contents.get<std::string> ("message.account"));
if (representatives.find (representative_text_l) == representatives.end ())
{
should_filter_l = true;
}
}
return should_filter_l;
}
@ -656,7 +662,7 @@ nano::websocket::message nano::websocket::message_builder::block_confirmed (std:
return message_l;
}
nano::websocket::message nano::websocket::message_builder::vote_received (std::shared_ptr<nano::vote> vote_a)
nano::websocket::message nano::websocket::message_builder::vote_received (std::shared_ptr<nano::vote> vote_a, nano::vote_code code_a)
{
nano::websocket::message message_l (nano::websocket::topic::vote);
set_common_fields (message_l);
@ -664,6 +670,25 @@ nano::websocket::message nano::websocket::message_builder::vote_received (std::s
// Vote information
boost::property_tree::ptree vote_node_l;
vote_a->serialize_json (vote_node_l);
// Vote processing information
std::string vote_type = "invalid";
switch (code_a)
{
case nano::vote_code::vote:
vote_type = "vote";
break;
case nano::vote_code::replay:
vote_type = "replay";
break;
case nano::vote_code::indeterminate:
vote_type = "indeterminate";
break;
case nano::vote_code::invalid:
assert (false);
break;
}
vote_node_l.put ("type", vote_type);
message_l.contents.add_child ("message", vote_node_l);
return message_l;
}

View file

@ -81,7 +81,7 @@ namespace websocket
public:
message block_confirmed (std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype, bool include_block, nano::election_status const & election_status_a, nano::websocket::confirmation_options const & options_a);
message stopped_election (nano::block_hash const & hash_a);
message vote_received (std::shared_ptr<nano::vote> vote_a);
message vote_received (std::shared_ptr<nano::vote> vote_a, nano::vote_code code_a);
message difficulty_changed (uint64_t publish_threshold_a, uint64_t difficulty_active_a);
message work_generation (nano::block_hash const & root_a, uint64_t const work_a, uint64_t const difficulty_a, uint64_t const publish_threshold_a, std::chrono::milliseconds const & duration_a, std::string const & peer_a, std::vector<std::string> const & bad_peers_a, bool const completed_a = true, bool const cancelled_a = false);
message work_cancelled (nano::block_hash const & root_a, uint64_t const difficulty_a, uint64_t const publish_threshold_a, std::chrono::milliseconds const & duration_a, std::vector<std::string> const & bad_peers_a);
@ -179,6 +179,8 @@ namespace websocket
private:
std::unordered_set<std::string> representatives;
bool include_replays{ false };
bool include_indeterminate{ false };
};
/** A websocket session managing its own lifetime */

View file

@ -282,7 +282,8 @@ enum class vote_code
{
invalid, // Vote is not signed correctly
replay, // Vote does not have the highest sequence number, it's a replay
vote // Vote has the highest sequence number
vote, // Vote has the highest sequence number
indeterminate // Unknown if replay or vote
};
enum class process_result