From ba2078d032c886cf1371bedb79a0b496ffa45526 Mon Sep 17 00:00:00 2001 From: SergiySW Date: Wed, 14 Nov 2018 21:07:48 +0300 Subject: [PATCH] Single active_transactions mutex for vote_blocking (#1350) --- rai/core_test/ledger.cpp | 13 +++++++++++++ rai/core_test/node.cpp | 1 + rai/node/node.cpp | 31 +++++++++++++++++++++++++------ rai/node/node.hpp | 5 +++-- 4 files changed, 42 insertions(+), 8 deletions(-) diff --git a/rai/core_test/ledger.cpp b/rai/core_test/ledger.cpp index df72f646..c4c7c816 100644 --- a/rai/core_test/ledger.cpp +++ b/rai/core_test/ledger.cpp @@ -786,6 +786,7 @@ TEST (votes, check_signature) node1.active.start (send1); auto votes1 (node1.active.roots.find (send1->root ())->election); ASSERT_EQ (1, votes1->last_votes.size ()); + std::unique_lock lock (node1.active.mutex); auto vote1 (std::make_shared (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 1, send1)); vote1->signature.bytes[0] ^= 1; ASSERT_EQ (rai::vote_code::invalid, node1.vote_processor.vote_blocking (transaction, vote1, rai::endpoint (boost::asio::ip::address_v6 (), 0))); @@ -906,12 +907,16 @@ TEST (votes, add_old) node1.active.start (send1); auto votes1 (node1.active.roots.find (send1->root ())->election); auto vote1 (std::make_shared (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 2, send1)); + std::unique_lock lock (node1.active.mutex); node1.vote_processor.vote_blocking (transaction, vote1, node1.network.endpoint ()); + lock.unlock (); rai::keypair key2; auto send2 (std::make_shared (genesis.hash (), key2.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0)); auto vote2 (std::make_shared (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 1, send2)); votes1->last_votes[rai::test_genesis_key.pub].time = std::chrono::steady_clock::now () - std::chrono::seconds (20); + lock.lock (); node1.vote_processor.vote_blocking (transaction, vote2, node1.network.endpoint ()); + lock.unlock (); ASSERT_EQ (2, votes1->last_votes.size ()); ASSERT_NE (votes1->last_votes.end (), votes1->last_votes.find (rai::test_genesis_key.pub)); ASSERT_EQ (send1->hash (), votes1->last_votes[rai::test_genesis_key.pub].hash); @@ -942,12 +947,16 @@ TEST (votes, add_old_different_account) ASSERT_EQ (1, votes2->last_votes.size ()); auto vote1 (std::make_shared (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 2, send1)); auto transaction (system.nodes[0]->store.tx_begin ()); + std::unique_lock lock (node1.active.mutex); auto vote_result1 (node1.vote_processor.vote_blocking (transaction, vote1, node1.network.endpoint ())); + lock.unlock (); ASSERT_EQ (rai::vote_code::vote, vote_result1); ASSERT_EQ (2, votes1->last_votes.size ()); ASSERT_EQ (1, votes2->last_votes.size ()); + lock.lock (); auto vote2 (std::make_shared (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 1, send2)); auto vote_result2 (node1.vote_processor.vote_blocking (transaction, vote2, node1.network.endpoint ())); + lock.unlock (); ASSERT_EQ (rai::vote_code::vote, vote_result2); ASSERT_EQ (2, votes1->last_votes.size ()); ASSERT_EQ (2, votes2->last_votes.size ()); @@ -975,11 +984,15 @@ TEST (votes, add_cooldown) node1.active.start (send1); auto votes1 (node1.active.roots.find (send1->root ())->election); auto vote1 (std::make_shared (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 1, send1)); + std::unique_lock lock (node1.active.mutex); node1.vote_processor.vote_blocking (transaction, vote1, node1.network.endpoint ()); + lock.unlock (); rai::keypair key2; auto send2 (std::make_shared (genesis.hash (), key2.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0)); auto vote2 (std::make_shared (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 2, send2)); + lock.lock (); node1.vote_processor.vote_blocking (transaction, vote2, node1.network.endpoint ()); + lock.unlock (); ASSERT_EQ (2, votes1->last_votes.size ()); ASSERT_NE (votes1->last_votes.end (), votes1->last_votes.find (rai::test_genesis_key.pub)); ASSERT_EQ (send1->hash (), votes1->last_votes[rai::test_genesis_key.pub].hash); diff --git a/rai/core_test/node.cpp b/rai/core_test/node.cpp index 5b1459c5..eeb0620e 100644 --- a/rai/core_test/node.cpp +++ b/rai/core_test/node.cpp @@ -1824,6 +1824,7 @@ TEST (node, fork_invalid_block_signature_vote_by_hash) auto vote (std::make_shared (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 0, vote_blocks)); { auto transaction (system.nodes[0]->store.tx_begin_read ()); + std::unique_lock lock (system.nodes[0]->active.mutex); system.nodes[0]->vote_processor.vote_blocking (transaction, vote, system.nodes[0]->network.endpoint ()); } while (system.nodes[0]->block (send1->hash ())) diff --git a/rai/node/node.cpp b/rai/node/node.cpp index bc206823..24634653 100644 --- a/rai/node/node.cpp +++ b/rai/node/node.cpp @@ -812,10 +812,19 @@ void rai::vote_processor::process_loop () active = true; lock.unlock (); { + std::unique_lock active_single_lock (node.active.mutex); auto transaction (node.store.tx_begin_read ()); + uint64_t count (1); for (auto & i : votes_l) { vote_blocking (transaction, i.first, i.second); + // Free active_transactions mutex each 100 processed votes + if (count % 100 == 0) + { + active_single_lock.unlock (); + active_single_lock.lock (); + } + count++; } } lock.lock (); @@ -858,15 +867,17 @@ void rai::vote_processor::vote (std::shared_ptr vote_a, rai::endpoint } } +// node.active.mutex lock required rai::vote_code rai::vote_processor::vote_blocking (rai::transaction const & transaction_a, std::shared_ptr vote_a, rai::endpoint endpoint_a) { assert (endpoint_a.address ().is_v6 ()); + assert (!node.active.mutex.try_lock ()); auto result (rai::vote_code::invalid); if (!vote_a->validate ()) { auto max_vote (node.store.vote_max (transaction_a, vote_a)); result = rai::vote_code::replay; - if (!node.active.vote (vote_a)) + if (!node.active.vote (vote_a, true)) { result = rai::vote_code::vote; } @@ -1480,7 +1491,7 @@ vote_uniquer (block_uniquer) { BOOST_LOG (log) << boost::str (boost::format ("Found a representative at %1%") % endpoint_a); // Rebroadcasting all active votes to new representative - auto blocks (this->active.list_blocks ()); + auto blocks (this->active.list_blocks (true)); for (auto i (blocks.begin ()), n (blocks.end ()); i != n; ++i) { if (*i != nullptr) @@ -3086,13 +3097,17 @@ bool rai::active_transactions::add (std::pair, std:: } // Validate a vote and apply it to the current election if one exists -bool rai::active_transactions::vote (std::shared_ptr vote_a) +bool rai::active_transactions::vote (std::shared_ptr vote_a, bool single_lock) { std::shared_ptr election; bool replay (false); bool processed (false); { - std::lock_guard lock (mutex); + std::unique_lock lock; + if (!single_lock) + { + lock = std::unique_lock (mutex); + } for (auto vote_block : vote_a->blocks) { rai::election_vote_result result; @@ -3132,10 +3147,14 @@ bool rai::active_transactions::active (rai::block const & block_a) } // List of active blocks in elections -std::deque> rai::active_transactions::list_blocks () +std::deque> rai::active_transactions::list_blocks (bool single_lock) { std::deque> result; - std::lock_guard lock (mutex); + std::unique_lock lock; + if (!single_lock) + { + lock = std::unique_lock (mutex); + } for (auto i (roots.begin ()), n (roots.end ()); i != n; ++i) { result.push_back (i->election->status.winner); diff --git a/rai/node/node.hpp b/rai/node/node.hpp index 93f4fc7d..1655e861 100644 --- a/rai/node/node.hpp +++ b/rai/node/node.hpp @@ -98,10 +98,10 @@ public: bool add (std::pair, std::shared_ptr>, std::function)> const & = [](std::shared_ptr) {}); // If this returns true, the vote is a replay // If this returns false, the vote may or may not be a replay - bool vote (std::shared_ptr); + bool vote (std::shared_ptr, bool = false); // Is the root of this block in the roots container bool active (rai::block const &); - std::deque> list_blocks (); + std::deque> list_blocks (bool = false); void erase (rai::block const &); void stop (); bool publish (std::shared_ptr block_a); @@ -341,6 +341,7 @@ class vote_processor public: vote_processor (rai::node &); void vote (std::shared_ptr, rai::endpoint); + // node.active.mutex lock required rai::vote_code vote_blocking (rai::transaction const &, std::shared_ptr, rai::endpoint); void flush (); rai::node & node;