From 626a2fbc1b8142e35d1954cbde50e7aec9a161e0 Mon Sep 17 00:00:00 2001 From: SergiySW Date: Thu, 16 Nov 2017 01:03:18 +0300 Subject: [PATCH] Processing multiple transactions in single process_receive_many --- rai/node/node.cpp | 35 ++++++++++++++++++++++++----------- rai/node/node.hpp | 1 + 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/rai/node/node.cpp b/rai/node/node.cpp index a8fb760f..b45b58b7 100755 --- a/rai/node/node.cpp +++ b/rai/node/node.cpp @@ -1105,10 +1105,18 @@ void rai::block_processor::process_blocks () if (!blocks.empty ()) { { - auto info (blocks.front ()); - blocks.pop_front (); + auto completed (blocks.front ().second); + std::vector > blocks_processing; + while (!blocks.empty () && blocks_processing.size () < rai::blocks_per_transaction) + { + auto info (blocks.front ()); + blocks_processing.push_back (info.first); + blocks.pop_front (); + } + // Move first blocks to the end of blocks_processing + std::reverse (blocks_processing.begin (), blocks_processing.end ()); lock.unlock (); - process_receive_many (info.first, info.second); + process_receive_many (blocks_processing, completed); } // Let other threads get an opportunity to transaction lock std::this_thread::yield (); @@ -1126,18 +1134,23 @@ void rai::block_processor::process_blocks () void rai::block_processor::process_receive_many (std::shared_ptr block_a, std::function )> completed_a) { - std::vector > blocks; - blocks.push_back (block_a); - while (!blocks.empty ()) + std::vector > blocks_processing; + blocks_processing.push_back (block_a); + process_receive_many (blocks_processing, completed_a); +} + +void rai::block_processor::process_receive_many (std::vector > blocks_processing, std::function )> completed_a) +{ + while (!blocks_processing.empty ()) { std::deque , rai::process_return>> progress; { rai::transaction transaction (node.store.environment, nullptr, true); auto count (0); - while (!blocks.empty () && count < rai::blocks_per_transaction) + while (!blocks_processing.empty () && count < rai::blocks_per_transaction) { - auto block (blocks.back ()); - blocks.pop_back (); + auto block (blocks_processing.back ()); + blocks_processing.pop_back (); auto hash (block->hash ()); auto process_result (process_receive_one (transaction, block)); completed_a (transaction, process_result, block); @@ -1153,7 +1166,7 @@ void rai::block_processor::process_receive_many (std::shared_ptr bl for (auto i (cached.begin ()), n (cached.end ()); i != n; ++i) { node.store.unchecked_del (transaction, hash, **i); - blocks.push_back (std::move (*i)); + blocks_processing.push_back (std::move (*i)); } std::lock_guard lock (node.gap_cache.mutex); node.gap_cache.blocks.get <1> ().erase (hash); @@ -1604,7 +1617,7 @@ void rai::network::confirm_send (rai::confirm_ack const & confirm_a, std::shared void rai::node::process_active (std::shared_ptr incoming) { block_arrival.add (incoming->hash ()); - block_processor.process_receive_many (incoming); + block_processor.add (incoming); if (rai::rai_network == rai::rai_networks::rai_test_network) { block_processor.flush (); diff --git a/rai/node/node.hpp b/rai/node/node.hpp index fcae922b..7c30d2ba 100644 --- a/rai/node/node.hpp +++ b/rai/node/node.hpp @@ -441,6 +441,7 @@ public: void flush (); void add (std::shared_ptr , std::function )> = [] (MDB_txn *, rai::process_return, std::shared_ptr ) {}); void process_receive_many (std::shared_ptr , std::function )> = [] (MDB_txn *, rai::process_return, std::shared_ptr ) {}); + void process_receive_many (std::vector >, std::function )> = [] (MDB_txn *, rai::process_return, std::shared_ptr ) {}); rai::process_return process_receive_one (MDB_txn *, std::shared_ptr ); private: void process_blocks ();