Single active_transactions mutex for vote_blocking (#1350)

This commit is contained in:
SergiySW 2018-11-14 21:07:48 +03:00 committed by Roy Keene
commit ba2078d032
4 changed files with 42 additions and 8 deletions

View file

@ -786,6 +786,7 @@ TEST (votes, check_signature)
node1.active.start (send1);
auto votes1 (node1.active.roots.find (send1->root ())->election);
ASSERT_EQ (1, votes1->last_votes.size ());
std::unique_lock<std::mutex> lock (node1.active.mutex);
auto vote1 (std::make_shared<rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 1, send1));
vote1->signature.bytes[0] ^= 1;
ASSERT_EQ (rai::vote_code::invalid, node1.vote_processor.vote_blocking (transaction, vote1, rai::endpoint (boost::asio::ip::address_v6 (), 0)));
@ -906,12 +907,16 @@ TEST (votes, add_old)
node1.active.start (send1);
auto votes1 (node1.active.roots.find (send1->root ())->election);
auto vote1 (std::make_shared<rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 2, send1));
std::unique_lock<std::mutex> lock (node1.active.mutex);
node1.vote_processor.vote_blocking (transaction, vote1, node1.network.endpoint ());
lock.unlock ();
rai::keypair key2;
auto send2 (std::make_shared<rai::send_block> (genesis.hash (), key2.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
auto vote2 (std::make_shared<rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 1, send2));
votes1->last_votes[rai::test_genesis_key.pub].time = std::chrono::steady_clock::now () - std::chrono::seconds (20);
lock.lock ();
node1.vote_processor.vote_blocking (transaction, vote2, node1.network.endpoint ());
lock.unlock ();
ASSERT_EQ (2, votes1->last_votes.size ());
ASSERT_NE (votes1->last_votes.end (), votes1->last_votes.find (rai::test_genesis_key.pub));
ASSERT_EQ (send1->hash (), votes1->last_votes[rai::test_genesis_key.pub].hash);
@ -942,12 +947,16 @@ TEST (votes, add_old_different_account)
ASSERT_EQ (1, votes2->last_votes.size ());
auto vote1 (std::make_shared<rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 2, send1));
auto transaction (system.nodes[0]->store.tx_begin ());
std::unique_lock<std::mutex> lock (node1.active.mutex);
auto vote_result1 (node1.vote_processor.vote_blocking (transaction, vote1, node1.network.endpoint ()));
lock.unlock ();
ASSERT_EQ (rai::vote_code::vote, vote_result1);
ASSERT_EQ (2, votes1->last_votes.size ());
ASSERT_EQ (1, votes2->last_votes.size ());
lock.lock ();
auto vote2 (std::make_shared<rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 1, send2));
auto vote_result2 (node1.vote_processor.vote_blocking (transaction, vote2, node1.network.endpoint ()));
lock.unlock ();
ASSERT_EQ (rai::vote_code::vote, vote_result2);
ASSERT_EQ (2, votes1->last_votes.size ());
ASSERT_EQ (2, votes2->last_votes.size ());
@ -975,11 +984,15 @@ TEST (votes, add_cooldown)
node1.active.start (send1);
auto votes1 (node1.active.roots.find (send1->root ())->election);
auto vote1 (std::make_shared<rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 1, send1));
std::unique_lock<std::mutex> lock (node1.active.mutex);
node1.vote_processor.vote_blocking (transaction, vote1, node1.network.endpoint ());
lock.unlock ();
rai::keypair key2;
auto send2 (std::make_shared<rai::send_block> (genesis.hash (), key2.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
auto vote2 (std::make_shared<rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 2, send2));
lock.lock ();
node1.vote_processor.vote_blocking (transaction, vote2, node1.network.endpoint ());
lock.unlock ();
ASSERT_EQ (2, votes1->last_votes.size ());
ASSERT_NE (votes1->last_votes.end (), votes1->last_votes.find (rai::test_genesis_key.pub));
ASSERT_EQ (send1->hash (), votes1->last_votes[rai::test_genesis_key.pub].hash);

View file

@ -1824,6 +1824,7 @@ TEST (node, fork_invalid_block_signature_vote_by_hash)
auto vote (std::make_shared<rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 0, vote_blocks));
{
auto transaction (system.nodes[0]->store.tx_begin_read ());
std::unique_lock<std::mutex> lock (system.nodes[0]->active.mutex);
system.nodes[0]->vote_processor.vote_blocking (transaction, vote, system.nodes[0]->network.endpoint ());
}
while (system.nodes[0]->block (send1->hash ()))

View file

@ -812,10 +812,19 @@ void rai::vote_processor::process_loop ()
active = true;
lock.unlock ();
{
std::unique_lock<std::mutex> active_single_lock (node.active.mutex);
auto transaction (node.store.tx_begin_read ());
uint64_t count (1);
for (auto & i : votes_l)
{
vote_blocking (transaction, i.first, i.second);
// Free active_transactions mutex each 100 processed votes
if (count % 100 == 0)
{
active_single_lock.unlock ();
active_single_lock.lock ();
}
count++;
}
}
lock.lock ();
@ -858,15 +867,17 @@ void rai::vote_processor::vote (std::shared_ptr<rai::vote> vote_a, rai::endpoint
}
}
// node.active.mutex lock required
rai::vote_code rai::vote_processor::vote_blocking (rai::transaction const & transaction_a, std::shared_ptr<rai::vote> vote_a, rai::endpoint endpoint_a)
{
assert (endpoint_a.address ().is_v6 ());
assert (!node.active.mutex.try_lock ());
auto result (rai::vote_code::invalid);
if (!vote_a->validate ())
{
auto max_vote (node.store.vote_max (transaction_a, vote_a));
result = rai::vote_code::replay;
if (!node.active.vote (vote_a))
if (!node.active.vote (vote_a, true))
{
result = rai::vote_code::vote;
}
@ -1480,7 +1491,7 @@ vote_uniquer (block_uniquer)
{
BOOST_LOG (log) << boost::str (boost::format ("Found a representative at %1%") % endpoint_a);
// Rebroadcasting all active votes to new representative
auto blocks (this->active.list_blocks ());
auto blocks (this->active.list_blocks (true));
for (auto i (blocks.begin ()), n (blocks.end ()); i != n; ++i)
{
if (*i != nullptr)
@ -3086,13 +3097,17 @@ bool rai::active_transactions::add (std::pair<std::shared_ptr<rai::block>, std::
}
// Validate a vote and apply it to the current election if one exists
bool rai::active_transactions::vote (std::shared_ptr<rai::vote> vote_a)
bool rai::active_transactions::vote (std::shared_ptr<rai::vote> vote_a, bool single_lock)
{
std::shared_ptr<rai::election> election;
bool replay (false);
bool processed (false);
{
std::lock_guard<std::mutex> lock (mutex);
std::unique_lock<std::mutex> lock;
if (!single_lock)
{
lock = std::unique_lock<std::mutex> (mutex);
}
for (auto vote_block : vote_a->blocks)
{
rai::election_vote_result result;
@ -3132,10 +3147,14 @@ bool rai::active_transactions::active (rai::block const & block_a)
}
// List of active blocks in elections
std::deque<std::shared_ptr<rai::block>> rai::active_transactions::list_blocks ()
std::deque<std::shared_ptr<rai::block>> rai::active_transactions::list_blocks (bool single_lock)
{
std::deque<std::shared_ptr<rai::block>> result;
std::lock_guard<std::mutex> lock (mutex);
std::unique_lock<std::mutex> lock;
if (!single_lock)
{
lock = std::unique_lock<std::mutex> (mutex);
}
for (auto i (roots.begin ()), n (roots.end ()); i != n; ++i)
{
result.push_back (i->election->status.winner);

View file

@ -98,10 +98,10 @@ public:
bool add (std::pair<std::shared_ptr<rai::block>, std::shared_ptr<rai::block>>, std::function<void(std::shared_ptr<rai::block>)> const & = [](std::shared_ptr<rai::block>) {});
// If this returns true, the vote is a replay
// If this returns false, the vote may or may not be a replay
bool vote (std::shared_ptr<rai::vote>);
bool vote (std::shared_ptr<rai::vote>, bool = false);
// Is the root of this block in the roots container
bool active (rai::block const &);
std::deque<std::shared_ptr<rai::block>> list_blocks ();
std::deque<std::shared_ptr<rai::block>> list_blocks (bool = false);
void erase (rai::block const &);
void stop ();
bool publish (std::shared_ptr<rai::block> block_a);
@ -341,6 +341,7 @@ class vote_processor
public:
vote_processor (rai::node &);
void vote (std::shared_ptr<rai::vote>, rai::endpoint);
// node.active.mutex lock required
rai::vote_code vote_blocking (rai::transaction const &, std::shared_ptr<rai::vote>, rai::endpoint);
void flush ();
rai::node & node;