Vote replay in crawler (#2497)
* Move vote replay code in to rep_crawler since this is a very infrequently needed codepath and it only really needs to be done once and can be done during rep discovery. * Directly calling vote_blocking instead of creating a new deque of valid votes. * Fixing timeout for possibly slow test situations.
This commit is contained in:
parent
2d503c3651
commit
b2c094a98d
6 changed files with 43 additions and 70 deletions
|
@ -742,11 +742,10 @@ TEST (votes, check_signature)
|
|||
}
|
||||
auto vote1 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 1, send1));
|
||||
vote1->signature.bytes[0] ^= 1;
|
||||
auto transaction (node1.store.tx_begin_read ());
|
||||
ASSERT_EQ (nano::vote_code::invalid, node1.vote_processor.vote_blocking (transaction, vote1, std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, nano::endpoint (boost::asio::ip::address_v6 (), 0), node1.network_params.protocol.protocol_version)));
|
||||
ASSERT_EQ (nano::vote_code::invalid, node1.vote_processor.vote_blocking (vote1, std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, nano::endpoint (boost::asio::ip::address_v6 (), 0), node1.network_params.protocol.protocol_version)));
|
||||
vote1->signature.bytes[0] ^= 1;
|
||||
ASSERT_EQ (nano::vote_code::vote, node1.vote_processor.vote_blocking (transaction, vote1, std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, nano::endpoint (boost::asio::ip::address_v6 (), 0), node1.network_params.protocol.protocol_version)));
|
||||
ASSERT_EQ (nano::vote_code::replay, node1.vote_processor.vote_blocking (transaction, vote1, std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, nano::endpoint (boost::asio::ip::address_v6 (), 0), node1.network_params.protocol.protocol_version)));
|
||||
ASSERT_EQ (nano::vote_code::vote, node1.vote_processor.vote_blocking (vote1, std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, nano::endpoint (boost::asio::ip::address_v6 (), 0), node1.network_params.protocol.protocol_version)));
|
||||
ASSERT_EQ (nano::vote_code::replay, node1.vote_processor.vote_blocking (vote1, std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, nano::endpoint (boost::asio::ip::address_v6 (), 0), node1.network_params.protocol.protocol_version)));
|
||||
}
|
||||
|
||||
TEST (votes, add_one)
|
||||
|
@ -878,13 +877,13 @@ TEST (votes, add_old)
|
|||
votes1 = node1.active.roots.find (send1->qualified_root ())->election;
|
||||
}
|
||||
auto channel (std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, node1.network.endpoint (), node1.network_params.protocol.protocol_version));
|
||||
node1.vote_processor.vote_blocking (transaction, vote1, channel);
|
||||
node1.vote_processor.vote_blocking (vote1, channel);
|
||||
nano::keypair key2;
|
||||
auto send2 (std::make_shared<nano::send_block> (genesis.hash (), key2.pub, 0, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
|
||||
node1.work_generate_blocking (*send2);
|
||||
auto vote2 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 1, send2));
|
||||
votes1->last_votes[nano::test_genesis_key.pub].time = std::chrono::steady_clock::now () - std::chrono::seconds (20);
|
||||
node1.vote_processor.vote_blocking (transaction, vote2, channel);
|
||||
node1.vote_processor.vote_blocking (vote2, channel);
|
||||
ASSERT_EQ (2, votes1->last_votes_size ());
|
||||
ASSERT_NE (votes1->last_votes.end (), votes1->last_votes.find (nano::test_genesis_key.pub));
|
||||
ASSERT_EQ (send1->hash (), votes1->last_votes[nano::test_genesis_key.pub].hash);
|
||||
|
@ -919,12 +918,12 @@ TEST (votes, add_old_different_account)
|
|||
ASSERT_EQ (1, votes2->last_votes_size ());
|
||||
auto vote1 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 2, send1));
|
||||
auto channel (std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, node1.network.endpoint (), node1.network_params.protocol.protocol_version));
|
||||
auto vote_result1 (node1.vote_processor.vote_blocking (transaction, vote1, channel));
|
||||
auto vote_result1 (node1.vote_processor.vote_blocking (vote1, channel));
|
||||
ASSERT_EQ (nano::vote_code::vote, vote_result1);
|
||||
ASSERT_EQ (2, votes1->last_votes.size ());
|
||||
ASSERT_EQ (1, votes2->last_votes.size ());
|
||||
auto vote2 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 1, send2));
|
||||
auto vote_result2 (node1.vote_processor.vote_blocking (transaction, vote2, channel));
|
||||
auto vote_result2 (node1.vote_processor.vote_blocking (vote2, channel));
|
||||
ASSERT_EQ (nano::vote_code::vote, vote_result2);
|
||||
ASSERT_EQ (2, votes1->last_votes.size ());
|
||||
ASSERT_EQ (2, votes2->last_votes.size ());
|
||||
|
@ -957,12 +956,12 @@ TEST (votes, add_cooldown)
|
|||
}
|
||||
auto vote1 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 1, send1));
|
||||
auto channel (std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, node1.network.endpoint (), node1.network_params.protocol.protocol_version));
|
||||
node1.vote_processor.vote_blocking (transaction, vote1, channel);
|
||||
node1.vote_processor.vote_blocking (vote1, channel);
|
||||
nano::keypair key2;
|
||||
auto send2 (std::make_shared<nano::send_block> (genesis.hash (), key2.pub, 0, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
|
||||
node1.work_generate_blocking (*send2);
|
||||
auto vote2 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 2, send2));
|
||||
node1.vote_processor.vote_blocking (transaction, vote2, channel);
|
||||
node1.vote_processor.vote_blocking (vote2, channel);
|
||||
ASSERT_EQ (2, votes1->last_votes.size ());
|
||||
ASSERT_NE (votes1->last_votes.end (), votes1->last_votes.find (nano::test_genesis_key.pub));
|
||||
ASSERT_EQ (send1->hash (), votes1->last_votes[nano::test_genesis_key.pub].hash);
|
||||
|
|
|
@ -2286,34 +2286,31 @@ TEST (node, send_callback)
|
|||
// This helps representatives continue from their last sequence number if their node is reinitialized and the old sequence number is lost
|
||||
TEST (node, vote_replay)
|
||||
{
|
||||
nano::system system (2);
|
||||
nano::system system (1);
|
||||
auto & node1 (*system.nodes[0]);
|
||||
auto & node2 (*system.nodes[1]);
|
||||
nano::keypair key;
|
||||
auto open (std::make_shared<nano::open_block> (0, 1, key.pub, key.prv, key.pub, 0));
|
||||
node1.work_generate_blocking (*open);
|
||||
nano::genesis genesis;
|
||||
for (auto i (0); i < 11000; ++i)
|
||||
{
|
||||
auto transaction (node2.store.tx_begin_read ());
|
||||
auto vote (node2.store.vote_generate (transaction, nano::test_genesis_key.pub, nano::test_genesis_key.prv, open));
|
||||
}
|
||||
{
|
||||
auto transaction (node1.store.tx_begin_read ());
|
||||
nano::lock_guard<std::mutex> lock (node1.store.get_cache_mutex ());
|
||||
auto vote (node1.store.vote_current (transaction, nano::test_genesis_key.pub));
|
||||
auto vote (node1.store.vote_generate (transaction, nano::test_genesis_key.pub, nano::test_genesis_key.prv, genesis.open));
|
||||
}
|
||||
auto node2 = system.add_node ();
|
||||
{
|
||||
auto transaction (node2->store.tx_begin_read ());
|
||||
nano::lock_guard<std::mutex> lock (node2->store.get_cache_mutex ());
|
||||
auto vote (node2->store.vote_current (transaction, nano::test_genesis_key.pub));
|
||||
ASSERT_EQ (nullptr, vote);
|
||||
}
|
||||
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
|
||||
auto block (system.wallet (0)->send_action (nano::test_genesis_key.pub, key.pub, nano::Gxrb_ratio));
|
||||
ASSERT_NE (nullptr, block);
|
||||
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
|
||||
auto done (false);
|
||||
system.deadline_set (20s);
|
||||
while (!done)
|
||||
{
|
||||
auto ec = system.poll ();
|
||||
auto transaction (node1.store.tx_begin_read ());
|
||||
nano::lock_guard<std::mutex> lock (node1.store.get_cache_mutex ());
|
||||
auto vote (node1.store.vote_current (transaction, nano::test_genesis_key.pub));
|
||||
auto transaction (node2->store.tx_begin_read ());
|
||||
nano::lock_guard<std::mutex> lock (node2->store.get_cache_mutex ());
|
||||
auto vote (node2->store.vote_current (transaction, nano::test_genesis_key.pub));
|
||||
done = vote && (vote->sequence >= 10000);
|
||||
ASSERT_NO_ERROR (ec);
|
||||
}
|
||||
|
@ -3010,10 +3007,7 @@ TEST (node, fork_invalid_block_signature_vote_by_hash)
|
|||
std::vector<nano::block_hash> vote_blocks;
|
||||
vote_blocks.push_back (send2->hash ());
|
||||
auto vote (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, vote_blocks));
|
||||
{
|
||||
auto transaction (node1.store.tx_begin_read ());
|
||||
node1.vote_processor.vote_blocking (transaction, vote, std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, node1.network.endpoint (), node1.network_params.protocol.protocol_version));
|
||||
}
|
||||
node1.vote_processor.vote_blocking (vote, std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, node1.network.endpoint (), node1.network_params.protocol.protocol_version));
|
||||
while (node1.block (send1->hash ()))
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
|
@ -3179,10 +3173,7 @@ TEST (node, confirm_back)
|
|||
std::vector<nano::block_hash> vote_blocks;
|
||||
vote_blocks.push_back (send2->hash ());
|
||||
auto vote (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, vote_blocks));
|
||||
{
|
||||
auto transaction (node.store.tx_begin_read ());
|
||||
node.vote_processor.vote_blocking (transaction, vote, std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, node.network.endpoint (), node.network_params.protocol.protocol_version));
|
||||
}
|
||||
node.vote_processor.vote_blocking (vote, std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, node.network.endpoint (), node.network_params.protocol.protocol_version));
|
||||
system.deadline_set (10s);
|
||||
while (!node.active.empty ())
|
||||
{
|
||||
|
|
|
@ -131,7 +131,7 @@ bootstrap_initiator (*this),
|
|||
bootstrap (config.peering_port, *this),
|
||||
application_path (application_path_a),
|
||||
port_mapping (*this),
|
||||
vote_processor (checker, active, store, observers, stats, config, logger, online_reps, ledger, network_params),
|
||||
vote_processor (checker, active, observers, stats, config, logger, online_reps, ledger, network_params),
|
||||
rep_crawler (*this),
|
||||
warmed_up (0),
|
||||
block_processor (*this, write_database_queue),
|
||||
|
|
|
@ -38,6 +38,7 @@ void nano::rep_crawler::validate ()
|
|||
nano::lock_guard<std::mutex> lock (active_mutex);
|
||||
responses_l.swap (responses);
|
||||
}
|
||||
auto transaction (node.store.tx_begin_read ());
|
||||
auto minimum = node.minimum_principal_weight ();
|
||||
for (auto const & i : responses_l)
|
||||
{
|
||||
|
@ -85,6 +86,15 @@ void nano::rep_crawler::validate ()
|
|||
}
|
||||
}
|
||||
}
|
||||
// This tries to assist rep nodes that have lost track of their highest sequence number by replaying our highest known vote back to them
|
||||
// Only do this if the sequence number is significantly different to account for network reordering
|
||||
// Amplify attack considerations: We're sending out a confirm_ack in response to a confirm_ack for no net traffic increase
|
||||
auto max_vote (node.store.vote_max (transaction, vote));
|
||||
if (max_vote->sequence > vote->sequence + 10000)
|
||||
{
|
||||
nano::confirm_ack confirm (max_vote);
|
||||
channel->send (confirm); // this is non essential traffic as it will be resolicited if not received
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -15,10 +15,9 @@
|
|||
|
||||
#include <boost/format.hpp>
|
||||
|
||||
nano::vote_processor::vote_processor (nano::signature_checker & checker_a, nano::active_transactions & active_a, nano::block_store & store_a, nano::node_observers & observers_a, nano::stat & stats_a, nano::node_config & config_a, nano::logger_mt & logger_a, nano::online_reps & online_reps_a, nano::ledger & ledger_a, nano::network_params & network_params_a) :
|
||||
nano::vote_processor::vote_processor (nano::signature_checker & checker_a, nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stat & stats_a, nano::node_config & config_a, nano::logger_mt & logger_a, nano::online_reps & online_reps_a, nano::ledger & ledger_a, nano::network_params & network_params_a) :
|
||||
checker (checker_a),
|
||||
active (active_a),
|
||||
store (store_a),
|
||||
observers (observers_a),
|
||||
stats (stats_a),
|
||||
config (config_a),
|
||||
|
@ -54,7 +53,7 @@ void nano::vote_processor::process_loop ()
|
|||
{
|
||||
if (!votes.empty ())
|
||||
{
|
||||
std::deque<std::pair<std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>>> votes_l;
|
||||
decltype (votes) votes_l;
|
||||
votes_l.swap (votes);
|
||||
|
||||
log_this_iteration = false;
|
||||
|
@ -70,20 +69,6 @@ void nano::vote_processor::process_loop ()
|
|||
is_active = true;
|
||||
lock.unlock ();
|
||||
verify_votes (votes_l);
|
||||
{
|
||||
auto transaction (store.tx_begin_read ());
|
||||
uint64_t count (1);
|
||||
for (auto & i : votes_l)
|
||||
{
|
||||
vote_blocking (transaction, i.first, i.second, true);
|
||||
// Free active_transactions mutex each 100 processed votes
|
||||
if (count % 100 == 0)
|
||||
{
|
||||
transaction.refresh ();
|
||||
}
|
||||
count++;
|
||||
}
|
||||
}
|
||||
lock.lock ();
|
||||
is_active = false;
|
||||
|
||||
|
@ -155,7 +140,7 @@ void nano::vote_processor::vote (std::shared_ptr<nano::vote> vote_a, std::shared
|
|||
}
|
||||
}
|
||||
|
||||
void nano::vote_processor::verify_votes (std::deque<std::pair<std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>>> & votes_a)
|
||||
void nano::vote_processor::verify_votes (decltype (votes) const & votes_a)
|
||||
{
|
||||
auto size (votes_a.size ());
|
||||
std::vector<unsigned char const *> messages;
|
||||
|
@ -178,37 +163,26 @@ void nano::vote_processor::verify_votes (std::deque<std::pair<std::shared_ptr<na
|
|||
}
|
||||
nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () };
|
||||
checker.verify (check);
|
||||
std::remove_reference_t<decltype (votes_a)> result;
|
||||
auto i (0);
|
||||
for (auto const & vote : votes_a)
|
||||
{
|
||||
assert (verifications[i] == 1 || verifications[i] == 0);
|
||||
if (verifications[i] == 1)
|
||||
{
|
||||
result.push_back (vote);
|
||||
vote_blocking (vote.first, vote.second, true);
|
||||
}
|
||||
++i;
|
||||
}
|
||||
votes_a.swap (result);
|
||||
}
|
||||
|
||||
// node.active.mutex lock required
|
||||
nano::vote_code nano::vote_processor::vote_blocking (nano::transaction const & transaction_a, std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> channel_a, bool validated)
|
||||
nano::vote_code nano::vote_processor::vote_blocking (std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> channel_a, bool validated)
|
||||
{
|
||||
auto result (nano::vote_code::invalid);
|
||||
if (validated || !vote_a->validate ())
|
||||
{
|
||||
result = active.vote (vote_a);
|
||||
observers.vote.notify (vote_a, channel_a, result);
|
||||
// This tries to assist rep nodes that have lost track of their highest sequence number by replaying our highest known vote back to them
|
||||
// Only do this if the sequence number is significantly different to account for network reordering
|
||||
// Amplify attack considerations: We're sending out a confirm_ack in response to a confirm_ack for no net traffic increase
|
||||
auto max_vote (store.vote_max (transaction_a, vote_a));
|
||||
if (max_vote->sequence > vote_a->sequence + 10000)
|
||||
{
|
||||
nano::confirm_ack confirm (max_vote);
|
||||
channel_a->send (confirm); // this is non essential traffic as it will be resolicited if not received
|
||||
}
|
||||
}
|
||||
std::string status;
|
||||
switch (result)
|
||||
|
|
|
@ -32,11 +32,11 @@ namespace transport
|
|||
class vote_processor final
|
||||
{
|
||||
public:
|
||||
explicit vote_processor (nano::signature_checker & checker_a, nano::active_transactions & active_a, nano::block_store & store_a, nano::node_observers & observers_a, nano::stat & stats_a, nano::node_config & config_a, nano::logger_mt & logger_a, nano::online_reps & online_reps_a, nano::ledger & ledger_a, nano::network_params & network_params_a);
|
||||
explicit vote_processor (nano::signature_checker & checker_a, nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stat & stats_a, nano::node_config & config_a, nano::logger_mt & logger_a, nano::online_reps & online_reps_a, nano::ledger & ledger_a, nano::network_params & network_params_a);
|
||||
void vote (std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>);
|
||||
/** Note: node.active.mutex lock is required */
|
||||
nano::vote_code vote_blocking (nano::transaction const &, std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>, bool = false);
|
||||
void verify_votes (std::deque<std::pair<std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>>> &);
|
||||
nano::vote_code vote_blocking (std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>, bool = false);
|
||||
void verify_votes (std::deque<std::pair<std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>>> const &);
|
||||
void flush ();
|
||||
void calculate_weights ();
|
||||
void stop ();
|
||||
|
@ -46,7 +46,6 @@ private:
|
|||
|
||||
nano::signature_checker & checker;
|
||||
nano::active_transactions & active;
|
||||
nano::block_store & store;
|
||||
nano::node_observers & observers;
|
||||
nano::stat & stats;
|
||||
nano::node_config & config;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue