Keeping entire vote instead of just the sequence number so it can be replayed out to the network.

This commit is contained in:
clemahieu 2017-09-24 20:42:27 -05:00
commit fe17d2ad4d
13 changed files with 255 additions and 178 deletions

View file

@ -589,25 +589,28 @@ TEST (block_store, sequence_increment)
bool init (false);
rai::block_store store (init, rai::unique_path ());
ASSERT_TRUE (!init);
rai::account account1 (1);
rai::account account2 (2);
rai::keypair key1;
rai::keypair key2;
auto block1 (std::make_shared <rai::open_block> (0, 1, 0, rai::keypair ().prv, 0, 0));
rai::transaction transaction (store.environment, nullptr, true);
auto seq1 (store.sequence_atomic_inc (transaction, account1));
ASSERT_EQ (1, seq1);
auto seq2 (store.sequence_atomic_inc (transaction, account1));
ASSERT_EQ (2, seq2);
auto seq3 (store.sequence_atomic_inc (transaction, account2));
ASSERT_EQ (1, seq3);
auto seq4 (store.sequence_atomic_inc (transaction, account2));
ASSERT_EQ (2, seq4);
auto seq5 (store.sequence_atomic_observe (transaction, account1, 20));
ASSERT_EQ (20, seq5);
auto seq6 (store.sequence_atomic_observe (transaction, account2, 30));
ASSERT_EQ (30, seq6);
auto seq7 (store.sequence_atomic_inc (transaction, account1));
ASSERT_EQ (21, seq7);
auto seq8 (store.sequence_atomic_inc (transaction, account2));
ASSERT_EQ (31, seq8);
auto vote1 (store.vote_generate (transaction, key1.pub, key1.prv, block1));
ASSERT_EQ (1, vote1->sequence);
auto vote2 (store.vote_generate (transaction, key1.pub, key1.prv, block1));
ASSERT_EQ (2, vote2->sequence);
auto vote3 (store.vote_generate (transaction, key2.pub, key2.prv, block1));
ASSERT_EQ (1, vote3->sequence);
auto vote4 (store.vote_generate (transaction, key2.pub, key2.prv, block1));
ASSERT_EQ (2, vote4->sequence);
vote1->sequence = 20;
auto seq5 (store.vote_max (transaction, vote1));
ASSERT_EQ (20, seq5->sequence);
vote3->sequence = 30;
auto seq6 (store.vote_max (transaction, vote3));
ASSERT_EQ (30, seq6->sequence);
auto vote5 (store.vote_generate (transaction, key1.pub, key1.prv, block1));
ASSERT_EQ (21, vote5->sequence);
auto vote6 (store.vote_generate (transaction, key2.pub, key2.prv, block1));
ASSERT_EQ (31, vote6->sequence);
}
TEST (block_store, upgrade_v2_v3)
@ -743,13 +746,13 @@ TEST (vote, validate)
ASSERT_TRUE (!init);
rai::keypair key1;
auto send1 (std::make_shared <rai::send_block> (0, key1.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
rai::vote vote1 (key1.pub, key1.prv, 2, send1);
auto vote1 (std::make_shared <rai::vote> (key1.pub, key1.prv, 2, send1));
rai::transaction transaction (store.environment, nullptr, true);
ASSERT_EQ (rai::vote_result::vote, vote1.validate (transaction, store));
vote1.signature.bytes [8] ^= 1;
ASSERT_EQ (rai::vote_result::invalid, vote1.validate (transaction, store));
rai::vote vote2 (key1.pub, key1.prv, 1, send1);
ASSERT_EQ (rai::vote_result::replay, vote2.validate (transaction, store));
ASSERT_EQ (rai::vote_result::vote, store.vote_validate (transaction, vote1));
vote1->signature.bytes [8] ^= 1;
ASSERT_EQ (rai::vote_result::invalid, store.vote_validate (transaction, vote1));
auto vote2 (std::make_shared <rai::vote> (key1.pub, key1.prv, 1, send1));
ASSERT_EQ (rai::vote_result::replay, store.vote_validate (transaction, vote2));
}
TEST (block_store, upgrade_v5_v6)
@ -882,11 +885,12 @@ TEST (block_store, sequence_flush)
rai::block_store store (init, path);
ASSERT_FALSE (init);
rai::transaction transaction (store.environment, nullptr, true);
rai::account account (0);
auto seq1 (store.sequence_atomic_inc (transaction, account));
auto seq2 (store.sequence_get (transaction, account));
ASSERT_NE (seq2, seq1);
store.sequence_flush(transaction);
auto seq3 (store.sequence_get (transaction, account));
ASSERT_EQ (seq3, seq1);
rai::keypair key1;
auto send1 (std::make_shared <rai::send_block> (0, 0, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
auto vote1 (store.vote_generate (transaction, key1.pub, key1.prv, send1));
auto seq2 (store.vote_get (transaction, vote1->account));
ASSERT_EQ (nullptr, seq2);
store.vote_flush (transaction);
auto seq3 (store.vote_get (transaction, vote1->account));
ASSERT_EQ (*seq3, *vote1);
}

View file

@ -44,7 +44,7 @@ TEST (conflicts, add_existing)
node1.active.start (transaction, send2);
}
ASSERT_EQ (1, node1.active.roots.size ());
rai::vote vote1 (key2.pub, key2.prv, 0, send2);
auto vote1 (std::make_shared <rai::vote> (key2.pub, key2.prv, 0, send2));
node1.active.vote (vote1);
ASSERT_EQ (1, node1.active.roots.size ());
auto votes1 (node1.active.roots.find (send2->root ())->election);

View file

@ -733,9 +733,9 @@ TEST (votes, add_one)
}
auto votes1 (node1.active.roots.find (send1->root ())->election);
ASSERT_EQ (1, votes1->votes.rep_votes.size ());
rai::vote vote1 (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 1, send1);
auto vote1 (std::make_shared <rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 1, send1));
votes1->vote (vote1);
rai::vote vote2 (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 2, send1);
auto vote2 (std::make_shared <rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 2, send1));
votes1->vote (vote1);
ASSERT_EQ (2, votes1->votes.rep_votes.size ());
auto existing1 (votes1->votes.rep_votes.find (rai::test_genesis_key.pub));
@ -764,11 +764,11 @@ TEST (votes, add_two)
node1.active.start (transaction, send1);
}
auto votes1 (node1.active.roots.find (send1->root ())->election);
rai::vote vote1 (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 1, send1);
auto vote1 (std::make_shared <rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 1, send1));
votes1->vote (vote1);
rai::keypair key2;
auto send2 (std::make_shared <rai::send_block> (genesis.hash (), key2.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
rai::vote vote2 (key2.pub, key2.prv, 1, send2);
auto vote2 (std::make_shared <rai::vote> (key2.pub, key2.prv, 1, send2));
votes1->vote (vote2);
ASSERT_EQ (3, votes1->votes.rep_votes.size ());
ASSERT_NE (votes1->votes.rep_votes.end (), votes1->votes.rep_votes.find (rai::test_genesis_key.pub));
@ -798,11 +798,11 @@ TEST (votes, add_existing)
node1.active.start (transaction, send1);
}
auto votes1 (node1.active.roots.find (send1->root ())->election);
rai::vote vote1 (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 1, send1);
auto vote1 (std::make_shared <rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 1, send1));
votes1->vote (vote1);
rai::keypair key2;
auto send2 (std::make_shared <rai::send_block> (genesis.hash (), key2.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
rai::vote vote2 (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 2, send2);
auto vote2 (std::make_shared <rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 2, send2));
votes1->vote (vote2);
ASSERT_EQ (2, votes1->votes.rep_votes.size ());
ASSERT_NE (votes1->votes.rep_votes.end (), votes1->votes.rep_votes.find (rai::test_genesis_key.pub));
@ -830,11 +830,11 @@ TEST (votes, add_old)
node1.active.start (transaction, send1);
}
auto votes1 (node1.active.roots.find (send1->root ())->election);
rai::vote vote1 (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 2, send1);
auto vote1 (std::make_shared <rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 2, send1));
node1.vote_processor.vote (vote1, rai::endpoint ());
rai::keypair key2;
auto send2 (std::make_shared <rai::send_block> (genesis.hash (), key2.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
rai::vote vote2 (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 1, send2);
auto vote2 (std::make_shared <rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 1, send2));
node1.vote_processor.vote (vote2, rai::endpoint ());
ASSERT_EQ (2, votes1->votes.rep_votes.size ());
ASSERT_NE (votes1->votes.rep_votes.end (), votes1->votes.rep_votes.find (rai::test_genesis_key.pub));

View file

@ -76,7 +76,7 @@ TEST (message, publish_serialization)
TEST (message, confirm_ack_serialization)
{
rai::keypair key1;
rai::vote vote (key1.pub, key1.prv, 0, std::unique_ptr <rai::block> (new rai::send_block (0, 1, 2, key1.prv, 4, 5)));
auto vote (std::make_shared <rai::vote> (key1.pub, key1.prv, 0, std::unique_ptr <rai::block> (new rai::send_block (0, 1, 2, key1.prv, 4, 5))));
rai::confirm_ack con1 (vote);
std::vector <uint8_t> bytes;
{

View file

@ -59,7 +59,7 @@ TEST (message_parser, exact_confirm_ack_size)
test_visitor visitor;
rai::message_parser parser (visitor, system.work);
auto block (std::unique_ptr <rai::send_block> (new rai::send_block (1, 1, 2, rai::keypair ().prv, 4, system.work.generate (1))));
rai::vote vote (0, rai::keypair ().prv, 0, std::move (block));
auto vote (std::make_shared <rai::vote> (0, rai::keypair ().prv, 0, std::move (block)));
rai::confirm_ack message (vote);
std::vector <uint8_t> bytes;
{

View file

@ -149,11 +149,13 @@ TEST (network, send_discarded_publish)
{
rai::system system (24000, 2);
auto block (std::make_shared <rai::send_block> (1, 1, 2, rai::keypair ().prv, 4, system.work.generate (1)));
rai::genesis genesis;
{
rai::transaction transaction (system.nodes [0]->store.environment, nullptr, false);
system.nodes [0]->network.republish_block (transaction, block);
rai::genesis genesis;
ASSERT_EQ (genesis.hash (), system.nodes [0]->ledger.latest (transaction, rai::test_genesis_key.pub));
ASSERT_EQ (genesis.hash (), system.nodes [1]->latest (rai::test_genesis_key.pub));
}
auto iterations (0);
while (system.nodes [1]->network.incoming.publish == 0)
{
@ -161,6 +163,7 @@ TEST (network, send_discarded_publish)
++iterations;
ASSERT_LT (iterations, 200);
}
rai::transaction transaction (system.nodes [0]->store.environment, nullptr, false);
ASSERT_EQ (genesis.hash (), system.nodes [0]->ledger.latest (transaction, rai::test_genesis_key.pub));
ASSERT_EQ (genesis.hash (), system.nodes [1]->latest (rai::test_genesis_key.pub));
}
@ -267,7 +270,7 @@ TEST (receivable_processor, confirm_insufficient_pos)
node1.active.start (transaction, block1);
}
rai::keypair key1;
rai::vote vote (key1.pub, key1.prv, 0, block1);
auto vote (std::make_shared <rai::vote> (key1.pub, key1.prv, 0, block1));
rai::confirm_ack con1 (vote);
node1.process_message (con1, node1.network.endpoint ());
}
@ -284,7 +287,7 @@ TEST (receivable_processor, confirm_sufficient_pos)
rai::transaction transaction (node1.store.environment, nullptr, true);
node1.active.start (transaction, block1);
}
rai::vote vote (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 0, block1);
auto vote (std::make_shared <rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 0, block1));
rai::confirm_ack con1 (vote);
node1.process_message (con1, node1.network.endpoint ());
}

View file

@ -1045,7 +1045,7 @@ TEST (node, fork_no_vote_quorum)
auto send2 (std::make_shared <rai::send_block> (block->hash (), key2, (rai::genesis_amount / 4) - (node1.config.receive_minimum.number () * 2), rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (block->hash ())));
rai::raw_key key3;
ASSERT_FALSE (system.wallet (1)->store.fetch (rai::transaction (system.wallet (1)->store.environment, nullptr, false), key1, key3));
rai::vote vote (key1, key3, 0, send2);
auto vote (std::make_shared <rai::vote> (key1, key3, 0, send2));
rai::confirm_ack confirm (vote);
std::shared_ptr <std::vector <uint8_t>> bytes (new std::vector <uint8_t>);
{

View file

@ -207,7 +207,7 @@ void rai::message_parser::deserialize_confirm_ack (uint8_t const * buffer_a, siz
rai::confirm_ack incoming (error_l, stream);
if (!error_l && at_end (stream))
{
if (!pool.work_validate (*incoming.vote.block))
if (!pool.work_validate (*incoming.vote->block))
{
visitor.confirm_ack (incoming);
}
@ -363,15 +363,15 @@ bool rai::confirm_req::operator == (rai::confirm_req const & other_a) const
rai::confirm_ack::confirm_ack (bool & error_a, rai::stream & stream_a) :
message (error_a, stream_a),
vote (error_a, stream_a, block_type ())
vote (std::make_shared <rai::vote> (error_a, stream_a))
{
}
rai::confirm_ack::confirm_ack (rai::vote const & vote_a) :
rai::confirm_ack::confirm_ack (std::shared_ptr <rai::vote> vote_a) :
message (rai::message_type::confirm_ack),
vote (vote_a)
{
block_type_set (vote.block->type ());
block_type_set (vote->block->type ());
}
bool rai::confirm_ack::deserialize (rai::stream & stream_a)
@ -381,17 +381,17 @@ bool rai::confirm_ack::deserialize (rai::stream & stream_a)
assert (type == rai::message_type::confirm_ack);
if (!result)
{
result = read (stream_a, vote.account);
result = read (stream_a, vote->account);
if (!result)
{
result = read (stream_a, vote.signature);
result = read (stream_a, vote->signature);
if (!result)
{
result = read (stream_a, vote.sequence);
result = read (stream_a, vote->sequence);
if (!result)
{
vote.block = rai::deserialize_block (stream_a, block_type ());
result = vote.block == nullptr;
vote->block = rai::deserialize_block (stream_a, block_type ());
result = vote->block == nullptr;
}
}
}
@ -403,15 +403,12 @@ void rai::confirm_ack::serialize (rai::stream & stream_a)
{
assert (block_type () == rai::block_type::send || block_type () == rai::block_type::receive || block_type () == rai::block_type::open || block_type () == rai::block_type::change);
write_header (stream_a);
write (stream_a, vote.account);
write (stream_a, vote.signature);
write (stream_a, vote.sequence);
vote.block->serialize (stream_a);
vote->serialize (stream_a);
}
bool rai::confirm_ack::operator == (rai::confirm_ack const & other_a) const
{
auto result (vote.account == other_a.vote.account && *vote.block == *other_a.vote.block && vote.signature == other_a.vote.signature && vote.sequence == other_a.vote.sequence);
auto result (*vote == *other_a.vote);
return result;
}

View file

@ -171,12 +171,12 @@ class confirm_ack : public message
{
public:
confirm_ack (bool &, rai::stream &);
confirm_ack (rai::vote const &);
confirm_ack (std::shared_ptr <rai::vote>);
bool deserialize (rai::stream &) override;
void serialize (rai::stream &) override;
void visit (rai::message_visitor &) const override;
bool operator == (rai::confirm_ack const &) const;
rai::vote vote;
std::shared_ptr <rai::vote> vote;
};
class frontier_req : public message
{

View file

@ -173,8 +173,7 @@ bool confirm_block (MDB_txn * transaction_a, rai::node & node_a, T & list_a, std
node_a.wallets.foreach_representative (transaction_a, [&result, &block_a, &list_a, &node_a, &transaction_a] (rai::public_key const & pub_a, rai::raw_key const & prv_a)
{
result = true;
auto sequence (node_a.store.sequence_atomic_inc (transaction_a, pub_a));
rai::vote vote (pub_a, prv_a, sequence, block_a);
auto vote (node_a.store.vote_generate (transaction_a, pub_a, prv_a, block_a));
rai::confirm_ack confirm (vote);
std::shared_ptr <std::vector <uint8_t>> bytes (new std::vector <uint8_t>);
{
@ -236,11 +235,11 @@ void rai::network::republish_block (MDB_txn * transaction, std::shared_ptr <rai:
// 1) Only if they are a non-replay vote of a block that's actively settling. Settling blocks are limited by block PoW
// 2) Only if a vote for this block hasn't been received in the previous X second. This prevents rapid publishing of votes with increasing sequence numbers.
// 3) The rep has a weight > Y to prevent creating a lot of small-weight accounts to send out votes
void rai::network::republish_vote (std::chrono::system_clock::time_point const & last_vote, rai::vote const & vote_a)
void rai::network::republish_vote (std::chrono::system_clock::time_point const & last_vote, std::shared_ptr <rai::vote> vote_a)
{
if (last_vote < std::chrono::system_clock::now () - std::chrono::seconds (1))
{
if (node.weight (vote_a.account) > rai::Mxrb_ratio * 256)
if (node.weight (vote_a->account) > rai::Mxrb_ratio * 256)
{
rai::confirm_ack confirm (vote_a);
std::shared_ptr <std::vector <uint8_t>> bytes (new std::vector <uint8_t>);
@ -380,12 +379,12 @@ public:
{
if (node.config.logging.network_message_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Received confirm_ack message from %1% for %2%") % sender % message_a.vote.block->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Received confirm_ack message from %1% for %2%") % sender % message_a.vote->block->hash ().to_string ());
}
++node.network.incoming.confirm_ack;
node.peers.contacted (sender, message_a.version_using);
node.peers.insert (sender, message_a.version_using);
node.process_active (message_a.vote.block);
node.process_active (message_a.vote->block);
node.vote_processor.vote (message_a.vote, sender);
}
void bulk_pull (rai::bulk_pull const &) override
@ -991,12 +990,12 @@ node (node_a)
{
}
rai::vote_result rai::vote_processor::vote (rai::vote const & vote_a, rai::endpoint endpoint_a)
rai::vote_result rai::vote_processor::vote (std::shared_ptr <rai::vote> vote_a, rai::endpoint endpoint_a)
{
rai::vote_result result;
{
rai::transaction transaction (node.store.environment, nullptr, false);
result = vote_a.validate (transaction, node.store);
result = node.store.vote_validate (transaction, vote_a);
}
if (node.config.logging.vote_logging ())
{
@ -1013,7 +1012,7 @@ rai::vote_result rai::vote_processor::vote (rai::vote const & vote_a, rai::endpo
status = "Vote";
break;
}
BOOST_LOG (node.log) << boost::str (boost::format ("Vote from: %1% sequence: %2% block: %3% status: %4%") % vote_a.account.to_account () % std::to_string (vote_a.sequence) % vote_a.block->hash ().to_string () % status);
BOOST_LOG (node.log) << boost::str (boost::format ("Vote from: %1% sequence: %2% block: %3% status: %4%") % vote_a->account.to_account () % std::to_string (vote_a->sequence) % vote_a->block->hash ().to_string () % status);
}
switch (result)
{
@ -1431,19 +1430,19 @@ block_processor (*this)
this->network.send_keepalive (endpoint_a);
rep_query (*this, endpoint_a);
});
observers.vote.add ([this] (rai::vote const & vote_a, rai::endpoint const &)
observers.vote.add ([this] (std::shared_ptr <rai::vote> vote_a, rai::endpoint const &)
{
active.vote (vote_a);
});
observers.vote.add ([this] (rai::vote const & vote_a, rai::endpoint const &)
observers.vote.add ([this] (std::shared_ptr <rai::vote> vote_a, rai::endpoint const &)
{
this->gap_cache.vote (vote_a);
});
observers.vote.add ([this] (rai::vote const & vote_a, rai::endpoint const & endpoint_a)
observers.vote.add ([this] (std::shared_ptr <rai::vote> vote_a, rai::endpoint const & endpoint_a)
{
if (this->rep_crawler.exists (vote_a.block->hash ()))
if (this->rep_crawler.exists (vote_a->block->hash ()))
{
auto weight_l (weight (vote_a.account));
auto weight_l (weight (vote_a->account));
// We see a valid non-replay vote for a block we requested, this node is probably a representative
if (peers.rep_response (endpoint_a, weight_l))
{
@ -1515,11 +1514,11 @@ void rai::gap_cache::add (MDB_txn * transaction_a, std::shared_ptr <rai::block>
}
}
void rai::gap_cache::vote (rai::vote const & vote_a)
void rai::gap_cache::vote (std::shared_ptr <rai::vote> vote_a)
{
rai::transaction transaction (node.store.environment, nullptr, false);
std::lock_guard <std::mutex> lock (mutex);
auto hash (vote_a.block->hash ());
auto hash (vote_a->block->hash ());
auto existing (blocks.get <1> ().find (hash));
if (existing != blocks.get <1> ().end ())
{
@ -1574,7 +1573,7 @@ void rai::network::confirm_send (rai::confirm_ack const & confirm_a, std::shared
{
if (node.config.logging.network_publish_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Sending confirm_ack for block %1% to %2%") % confirm_a.vote.block->hash ().to_string () % endpoint_a);
BOOST_LOG (node.log) << boost::str (boost::format ("Sending confirm_ack for block %1% to %2%") % confirm_a.vote->block->hash ().to_string () % endpoint_a);
}
std::weak_ptr <rai::node> node_w (node.shared ());
++outgoing.confirm_ack;
@ -1880,7 +1879,7 @@ void rai::node::ongoing_vote_flush ()
{
{
rai::transaction transaction (store.environment, nullptr, true);
store.sequence_flush (transaction);
store.vote_flush (transaction);
}
std::weak_ptr <rai::node> node_w (shared_from_this ());
alarm.add (std::chrono::system_clock::now () + std::chrono::seconds (5), [node_w] ()
@ -2621,7 +2620,7 @@ void rai::election::compute_rep_votes (MDB_txn * transaction_a)
{
node.wallets.foreach_representative (transaction_a, [this, transaction_a] (rai::public_key const & pub_a, rai::raw_key const & prv_a)
{
rai::vote vote (pub_a, prv_a, this->node.store.sequence_atomic_inc (transaction_a, pub_a), last_winner);
auto vote (this->node.store.vote_generate (transaction_a, pub_a, prv_a, last_winner));
this->votes.vote (vote);
});
}
@ -2712,12 +2711,12 @@ void rai::election::confirm_cutoff (MDB_txn * transaction_a)
confirm_once (transaction_a);
}
void rai::election::vote (rai::vote const & vote_a)
void rai::election::vote (std::shared_ptr <rai::vote> vote_a)
{
node.network.republish_vote (last_vote, vote_a);
last_vote = std::chrono::system_clock::now ();
rai::transaction transaction (node.store.environment, nullptr, true);
assert (vote_a.validate (transaction, node.store) != rai::vote_result::invalid);
assert (node.store.vote_validate (transaction, vote_a) != rai::vote_result::invalid);
votes.vote (vote_a);
confirm_if_quarum (transaction);
}
@ -2798,12 +2797,12 @@ void rai::active_transactions::start (MDB_txn * transaction_a, std::shared_ptr <
}
// Validate a vote and apply it to the current election if one exists
void rai::active_transactions::vote (rai::vote const & vote_a)
void rai::active_transactions::vote (std::shared_ptr <rai::vote> vote_a)
{
std::shared_ptr <rai::election> election;
{
std::lock_guard <std::mutex> lock (mutex);
auto root (vote_a.block->root ());
auto root (vote_a->block->root ());
auto existing (roots.find (root));
if (existing != roots.end ())
{

View file

@ -42,7 +42,7 @@ class election : public std::enable_shared_from_this <rai::election>
void confirm_once (MDB_txn *);
public:
election (MDB_txn *, rai::node &, std::shared_ptr <rai::block>, std::function <void (std::shared_ptr <rai::block>)> const &);
void vote (rai::vote const &);
void vote (std::shared_ptr <rai::vote>);
// Check if we have vote quorum
bool have_quorum (MDB_txn *);
// Tell the network our view of the winner
@ -78,7 +78,7 @@ public:
// Start an election for a block
// Call action with confirmed block, may be different than what we started with
void start (MDB_txn *, std::shared_ptr <rai::block>, std::function <void (std::shared_ptr <rai::block>)> const & = [] (std::shared_ptr <rai::block>) {});
void vote (rai::vote const &);
void vote (std::shared_ptr <rai::vote>);
// Is the root of this block in the roots container
bool active (rai::block const &);
void announce_votes ();
@ -131,7 +131,7 @@ class gap_cache
public:
gap_cache (rai::node &);
void add (MDB_txn *, std::shared_ptr <rai::block>);
void vote (rai::vote const &);
void vote (std::shared_ptr <rai::vote>);
rai::uint128_t bootstrap_threshold (MDB_txn *);
void purge_old ();
boost::multi_index_container
@ -300,7 +300,7 @@ public:
void receive_action (boost::system::error_code const &, size_t);
void rpc_action (boost::system::error_code const &, size_t);
void rebroadcast_reps (std::shared_ptr <rai::block>);
void republish_vote (std::chrono::system_clock::time_point const &, rai::vote const &);
void republish_vote (std::chrono::system_clock::time_point const &, std::shared_ptr <rai::vote>);
void republish_block (MDB_txn *, std::shared_ptr <rai::block>);
void republish (rai::block_hash const &, std::shared_ptr <std::vector <uint8_t>>, rai::endpoint);
void publish_broadcast (std::vector <rai::peer_information> &, std::unique_ptr <rai::block>);
@ -408,7 +408,7 @@ class node_observers
public:
rai::observer_set <std::shared_ptr <rai::block>, rai::account const &, rai::amount const &> blocks;
rai::observer_set <rai::account const &, bool> wallet;
rai::observer_set <rai::vote const &, rai::endpoint const &> vote;
rai::observer_set <std::shared_ptr <rai::vote>, rai::endpoint const &> vote;
rai::observer_set <rai::endpoint const &> endpoint;
rai::observer_set <> disconnect;
rai::observer_set <> started;
@ -417,7 +417,7 @@ class vote_processor
{
public:
vote_processor (rai::node &);
rai::vote_result vote (rai::vote const &, rai::endpoint);
rai::vote_result vote (std::shared_ptr <rai::vote>, rai::endpoint);
rai::node & node;
};
// The network is crawled for representatives by ocassionally sending a unicast confirm_req for a specific block and watching to see if it's acknowledged with a vote.

View file

@ -172,23 +172,23 @@ id (block_a->root ())
rep_votes.insert (std::make_pair (rai::not_an_account, block_a));
}
rai::tally_result rai::votes::vote (rai::vote const & vote_a)
rai::tally_result rai::votes::vote (std::shared_ptr <rai::vote> vote_a)
{
rai::tally_result result;
auto existing (rep_votes.find (vote_a.account));
auto existing (rep_votes.find (vote_a->account));
if (existing == rep_votes.end ())
{
// Vote on this block hasn't been seen from rep before
result = rai::tally_result::vote;
rep_votes.insert (std::make_pair (vote_a.account, vote_a.block));
rep_votes.insert (std::make_pair (vote_a->account, vote_a->block));
}
else
{
if (!(*existing->second == *vote_a.block))
if (!(*existing->second == *vote_a->block))
{
// Rep changed their vote
result = rai::tally_result::changed;
existing->second = vote_a.block;
existing->second = vote_a->block;
}
else
{
@ -1534,7 +1534,7 @@ checksum (0)
error_a |= mdb_dbi_open (transaction, "unchecked", MDB_CREATE | MDB_DUPSORT, &unchecked) != 0;
error_a |= mdb_dbi_open (transaction, "unsynced", MDB_CREATE, &unsynced) != 0;
error_a |= mdb_dbi_open (transaction, "checksum", MDB_CREATE, &checksum) != 0;
error_a |= mdb_dbi_open (transaction, "sequence", MDB_CREATE, &sequence) != 0;
error_a |= mdb_dbi_open (transaction, "sequence", MDB_CREATE, &vote) != 0;
error_a |= mdb_dbi_open (transaction, "meta", MDB_CREATE, &meta) != 0;
if (!error_a)
{
@ -2480,72 +2480,111 @@ void rai::block_store::checksum_del (MDB_txn * transaction_a, uint64_t prefix, u
assert (status == 0);
}
void rai::block_store::sequence_flush (MDB_txn * transaction_a)
void rai::block_store::vote_flush (MDB_txn * transaction_a)
{
std::unordered_map <rai::account, uint64_t> sequence_cache_l;
std::unordered_map <rai::account, std::shared_ptr <rai::vote>> sequence_cache_l;
{
std::lock_guard <std::mutex> lock (sequence_mutex);
sequence_cache_l.swap (sequence_cache);
std::lock_guard <std::mutex> lock (vote_mutex);
sequence_cache_l.swap (vote_cache);
}
for (auto i (sequence_cache_l.begin ()), n (sequence_cache_l.end ()); i != n; ++i)
{
auto status1 (mdb_put (transaction_a, sequence, i->first.val (), rai::mdb_val (sizeof (i->second), &i->second), 0));
std::vector <uint8_t> vector;
{
rai::vectorstream stream (vector);
i->second->serialize (stream);
}
auto status1 (mdb_put (transaction_a, vote, i->first.val (), rai::mdb_val (vector.size (), vector.data ()), 0));
assert (status1 == 0);
}
}
uint64_t rai::block_store::sequence_get (MDB_txn * transaction_a, rai::account const & account_a)
std::shared_ptr <rai::vote> rai::block_store::vote_get (MDB_txn * transaction_a, rai::account const & account_a)
{
uint64_t result (0);
std::shared_ptr <rai::vote> result;
MDB_val value;
auto status (mdb_get (transaction_a, sequence, account_a.val (), &value));
auto status (mdb_get (transaction_a, vote, account_a.val (), &value));
assert (status == 0 || status == MDB_NOTFOUND);
if (status == 0)
{
rai::bufferstream stream (reinterpret_cast <uint8_t const *> (value.mv_data), value.mv_size);
auto error (rai::read (stream, result));
assert (!error);
result = std::make_shared <rai::vote> (value);
assert (result != nullptr);
}
return result;
}
uint64_t rai::block_store::sequence_current (MDB_txn * transaction_a, rai::account const & account_a)
std::shared_ptr <rai::vote> rai::block_store::vote_current (MDB_txn * transaction_a, rai::account const & account_a)
{
assert (!sequence_mutex.try_lock ());
uint64_t result (0);
auto existing (sequence_cache.find (account_a));
if (existing != sequence_cache.end ())
assert (!vote_mutex.try_lock ());
std::shared_ptr <rai::vote> result;
auto existing (vote_cache.find (account_a));
if (existing != vote_cache.end ())
{
result = existing->second;
}
else
{
result = sequence_get (transaction_a, account_a);
result = vote_get (transaction_a, account_a);
}
return result;
}
uint64_t rai::block_store::sequence_atomic_inc (MDB_txn * transaction_a, rai::account const & account_a)
std::shared_ptr <rai::vote> rai::block_store::vote_generate (MDB_txn * transaction_a, rai::account const & account_a, rai::raw_key const & key_a, std::shared_ptr <rai::block> block_a)
{
std::lock_guard <std::mutex> lock (sequence_mutex);
auto result (sequence_current (transaction_a, account_a));
result += 1;
sequence_cache [account_a] = result;
return result;
}
uint64_t rai::block_store::sequence_atomic_observe (MDB_txn * transaction_a, rai::account const & account_a, uint64_t sequence_a)
{
std::lock_guard <std::mutex> lock (sequence_mutex);
auto current (sequence_current (transaction_a, account_a));
auto result (std::max (current, sequence_a));
if (sequence_a > current)
std::shared_ptr <rai::vote> result;
uint64_t sequence = 0;
{
sequence_cache [account_a] = sequence_a;
std::lock_guard <std::mutex> lock (vote_mutex);
result = vote_current (transaction_a, account_a);
sequence = (result ? result->sequence : 0) + 1;
}
result = std::make_shared <rai::vote> (account_a, key_a, sequence, block_a);
vote_cache [account_a] = result;
return result;
}
std::shared_ptr <rai::vote> rai::block_store::vote_max (MDB_txn * transaction_a, std::shared_ptr <rai::vote> vote_a)
{
std::lock_guard <std::mutex> lock (vote_mutex);
auto current (vote_current (transaction_a, vote_a->account));
auto result (vote_a);
if (current != nullptr)
{
if (current->sequence > result->sequence)
{
result = current;
}
}
vote_cache [vote_a->account] = result;
return result;
}
rai::vote_result rai::block_store::vote_validate (MDB_txn * transaction_a, std::shared_ptr <rai::vote> vote_a)
{
auto result (rai::vote_result::invalid);
// Reject unsigned votes
if (!rai::validate_message (vote_a->account, vote_a->hash (), vote_a->signature))
{
result = rai::vote_result::replay;
// Make sure this sequence number is > any we've seen from this account before
if (vote_max (transaction_a, vote_a) == vote_a)
{
result = rai::vote_result::vote;
}
}
return result;
}
bool rai::vote::operator == (rai::vote const & other_a) const
{
return sequence == other_a.sequence && *block == *other_a.block && account == other_a.account && signature == other_a.signature;
}
bool rai::vote::operator != (rai::vote const & other_a) const
{
return ! (*this == other_a);
}
namespace
{
class root_visitor : public rai::block_visitor
@ -3301,6 +3340,27 @@ signature (other_a.signature)
{
}
rai::vote::vote (bool & error_a, rai::stream & stream_a)
{
if (!error_a)
{
error_a = rai::read (stream_a, account.bytes);
if (!error_a)
{
error_a = rai::read (stream_a, signature.bytes);
if (!error_a)
{
error_a = rai::read (stream_a, sequence);
if (!error_a)
{
block = rai::deserialize_block (stream_a);
error_a = block == nullptr;
}
}
}
}
}
rai::vote::vote (bool & error_a, rai::stream & stream_a, rai::block_type type_a)
{
if (!error_a)
@ -3330,6 +3390,19 @@ signature (rai::sign_message (prv_a, account_a, hash ()))
{
}
rai::vote::vote (MDB_val const & value_a)
{
rai::bufferstream stream (reinterpret_cast <uint8_t const *> (value_a.mv_data), value_a.mv_size);
auto error (rai::read (stream, account.bytes));
assert (!error);
error = rai::read (stream, signature.bytes);
assert (!error);
error = rai::read (stream, sequence);
assert (!error);
block = rai::deserialize_block (stream);
assert (block != nullptr);
}
rai::uint256_union rai::vote::hash () const
{
rai::uint256_union result;
@ -3346,20 +3419,12 @@ rai::uint256_union rai::vote::hash () const
return result;
}
rai::vote_result rai::vote::validate (MDB_txn * transaction_a, rai::block_store & store_a) const
void rai::vote::serialize (rai::stream & stream_a)
{
auto result (rai::vote_result::invalid);
// Reject unsigned votes
if (!rai::validate_message (account, hash (), signature))
{
result = rai::vote_result::replay;
// Make sure this sequence number is > any we've seen from this account before
if (store_a.sequence_atomic_observe (transaction_a, account, sequence) == sequence)
{
result = rai::vote_result::vote;
}
}
return result;
write (stream_a, account);
write (stream_a, signature);
write (stream_a, sequence);
rai::serialize_block (stream_a, *block);
}
rai::genesis::genesis ()

View file

@ -313,6 +313,33 @@ public:
size_t open;
size_t change;
};
enum class vote_result
{
invalid, // Vote is not signed correctly
replay, // Vote does not have the highest sequence number, it's a replay
vote // Vote has the highest sequence number
};
class vote
{
public:
vote () = default;
vote (rai::vote const &);
vote (bool &, rai::stream &);
vote (bool &, rai::stream &, rai::block_type);
vote (rai::account const &, rai::raw_key const &, uint64_t, std::shared_ptr <rai::block>);
vote (MDB_val const &);
rai::uint256_union hash () const;
bool operator == (rai::vote const &) const;
bool operator != (rai::vote const &) const;
void serialize (rai::stream &);
// Vote round sequence number
uint64_t sequence;
std::shared_ptr <rai::block> block;
// Account that's voting
rai::account account;
// Signature of sequence + block hash
rai::signature signature;
};
class block_store
{
public:
@ -388,13 +415,18 @@ public:
bool checksum_get (MDB_txn *, uint64_t, uint8_t, rai::checksum &);
void checksum_del (MDB_txn *, uint64_t, uint8_t);
uint64_t sequence_get (MDB_txn *, rai::account const &);
uint64_t sequence_atomic_inc (MDB_txn *, rai::account const &);
uint64_t sequence_atomic_observe (MDB_txn *, rai::account const &, uint64_t);
uint64_t sequence_current (MDB_txn *, rai::account const &);
void sequence_flush (MDB_txn *);
std::mutex sequence_mutex;
std::unordered_map <rai::account, uint64_t> sequence_cache;
rai::vote_result vote_validate (MDB_txn *, std::shared_ptr <rai::vote>);
// Return latest vote for an account from store
std::shared_ptr <rai::vote> vote_get (MDB_txn *, rai::account const &);
// Populate vote with the next sequence number
std::shared_ptr <rai::vote> vote_generate (MDB_txn *, rai::account const &, rai::raw_key const &, std::shared_ptr <rai::block>);
// Return either vote or the stored vote with a higher sequence number
std::shared_ptr <rai::vote> vote_max (MDB_txn *, std::shared_ptr <rai::vote>);
// Return latest vote for an account considering the vote cache
std::shared_ptr <rai::vote> vote_current (MDB_txn *, rai::account const &);
void vote_flush (MDB_txn *);
std::mutex vote_mutex;
std::unordered_map <rai::account, std::shared_ptr <rai::vote>> vote_cache;
void version_put (MDB_txn *, int);
int version_get (MDB_txn *);
@ -432,8 +464,8 @@ public:
MDB_dbi unsynced;
// (uint56_t, uint8_t) -> block_hash // Mapping of region to checksum
MDB_dbi checksum;
// account -> uint64_t // Highest vote sequence observed for account
MDB_dbi sequence;
// account -> uint64_t // Highest vote observed for account
MDB_dbi vote;
// uint256_union -> ? // Meta information about block store
MDB_dbi meta;
};
@ -457,29 +489,6 @@ public:
rai::account account;
rai::amount amount;
};
enum class vote_result
{
invalid, // Vote is not signed correctly
replay, // Vote does not have the highest sequence number, it's a replay
vote // Vote has the highest sequence number
};
class vote
{
public:
vote () = default;
vote (rai::vote const &);
vote (bool &, rai::stream &, rai::block_type);
vote (rai::account const &, rai::raw_key const &, uint64_t, std::shared_ptr <rai::block>);
rai::uint256_union hash () const;
rai::vote_result validate (MDB_txn *, rai::block_store &) const;
// Vote round sequence number
uint64_t sequence;
std::shared_ptr <rai::block> block;
// Account that's voting
rai::account account;
// Signature of sequence + block hash
rai::signature signature;
};
enum class tally_result
{
vote,
@ -490,7 +499,7 @@ class votes
{
public:
votes (std::shared_ptr <rai::block>);
rai::tally_result vote (rai::vote const &);
rai::tally_result vote (std::shared_ptr <rai::vote>);
// Root block of fork
rai::block_hash id;
// All votes received by account