This commit is contained in:
clemahieu 2017-12-06 01:52:22 -06:00
commit a897a57692
2 changed files with 25 additions and 11 deletions

View file

@ -1105,10 +1105,18 @@ void rai::block_processor::process_blocks ()
if (!blocks.empty ())
{
{
auto info (blocks.front ());
blocks.pop_front ();
auto completed (blocks.front ().second);
std::vector <std::shared_ptr <rai::block>> blocks_processing;
while (!blocks.empty () && blocks_processing.size () < rai::blocks_per_transaction)
{
auto info (blocks.front ());
blocks_processing.push_back (info.first);
blocks.pop_front ();
}
// Move first blocks to the end of blocks_processing
std::reverse (blocks_processing.begin (), blocks_processing.end ());
lock.unlock ();
process_receive_many (info.first, info.second);
process_receive_many (blocks_processing, completed);
}
// Let other threads get an opportunity to transaction lock
std::this_thread::yield ();
@ -1126,18 +1134,23 @@ void rai::block_processor::process_blocks ()
void rai::block_processor::process_receive_many (std::shared_ptr <rai::block> block_a, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)> completed_a)
{
std::vector <std::shared_ptr <rai::block>> blocks;
blocks.push_back (block_a);
while (!blocks.empty ())
std::vector <std::shared_ptr <rai::block>> blocks_processing;
blocks_processing.push_back (block_a);
process_receive_many (blocks_processing, completed_a);
}
void rai::block_processor::process_receive_many (std::vector <std::shared_ptr <rai::block>> blocks_processing, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)> completed_a)
{
while (!blocks_processing.empty ())
{
std::deque <std::pair <std::shared_ptr <rai::block>, rai::process_return>> progress;
{
rai::transaction transaction (node.store.environment, nullptr, true);
auto count (0);
while (!blocks.empty () && count < rai::blocks_per_transaction)
while (!blocks_processing.empty () && count < rai::blocks_per_transaction)
{
auto block (blocks.back ());
blocks.pop_back ();
auto block (blocks_processing.back ());
blocks_processing.pop_back ();
auto hash (block->hash ());
auto process_result (process_receive_one (transaction, block));
completed_a (transaction, process_result, block);
@ -1153,7 +1166,7 @@ void rai::block_processor::process_receive_many (std::shared_ptr <rai::block> bl
for (auto i (cached.begin ()), n (cached.end ()); i != n; ++i)
{
node.store.unchecked_del (transaction, hash, **i);
blocks.push_back (std::move (*i));
blocks_processing.push_back (std::move (*i));
}
std::lock_guard <std::mutex> lock (node.gap_cache.mutex);
node.gap_cache.blocks.get <1> ().erase (hash);
@ -1608,7 +1621,7 @@ void rai::network::confirm_send (rai::confirm_ack const & confirm_a, std::shared
void rai::node::process_active (std::shared_ptr <rai::block> incoming)
{
block_arrival.add (incoming->hash ());
block_processor.process_receive_many (incoming);
block_processor.add (incoming);
if (rai::rai_network == rai::rai_networks::rai_test_network)
{
block_processor.flush ();

View file

@ -441,6 +441,7 @@ public:
void flush ();
void add (std::shared_ptr <rai::block>, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)> = [] (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>) {});
void process_receive_many (std::shared_ptr <rai::block>, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)> = [] (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>) {});
void process_receive_many (std::vector <std::shared_ptr <rai::block>>, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)> = [] (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>) {});
rai::process_return process_receive_one (MDB_txn *, std::shared_ptr <rai::block>);
private:
void process_blocks ();