Processing blocks in a deque instead of a vector since we don't need contiguousness.

Processing a completion action per-block.
This commit is contained in:
clemahieu 2017-12-07 22:13:59 -06:00
commit cf32e1677e
2 changed files with 18 additions and 27 deletions

View file

@ -1102,22 +1102,13 @@ void rai::block_processor::process_blocks ()
{
if (!blocks.empty ())
{
{
auto completed (blocks.front ().second);
std::vector <std::shared_ptr <rai::block>> blocks_processing;
while (!blocks.empty () && blocks_processing.size () < rai::blocks_per_transaction)
{
auto info (blocks.front ());
blocks_processing.push_back (info.first);
blocks.pop_front ();
}
// Move first blocks to the end of blocks_processing
std::reverse (blocks_processing.begin (), blocks_processing.end ());
lock.unlock ();
process_receive_many (blocks_processing, completed);
}
// Let other threads get an opportunity to transaction lock
std::this_thread::yield ();
std::deque <std::pair <std::shared_ptr <rai::block>, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)>>> blocks_processing;
std::swap (blocks, blocks_processing);
lock.unlock ();
std::reverse (blocks_processing.begin (), blocks_processing.end ());
process_receive_many (blocks_processing);
// Let other threads get an opportunity to transaction lock
std::this_thread::yield ();
lock.lock ();
}
else
@ -1132,12 +1123,12 @@ void rai::block_processor::process_blocks ()
void rai::block_processor::process_receive_many (std::shared_ptr <rai::block> block_a, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)> completed_a)
{
std::vector <std::shared_ptr <rai::block>> blocks_processing;
blocks_processing.push_back (block_a);
process_receive_many (blocks_processing, completed_a);
std::deque <std::pair <std::shared_ptr <rai::block>, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)>>> blocks_processing;
blocks_processing.push_back (std::make_pair (block_a, completed_a));
process_receive_many (blocks_processing);
}
void rai::block_processor::process_receive_many (std::vector <std::shared_ptr <rai::block>> blocks_processing, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)> completed_a)
void rai::block_processor::process_receive_many (std::deque <std::pair <std::shared_ptr <rai::block>, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)>>> & blocks_processing)
{
while (!blocks_processing.empty ())
{
@ -1147,16 +1138,16 @@ void rai::block_processor::process_receive_many (std::vector <std::shared_ptr <r
auto count (0);
while (!blocks_processing.empty () && count < rai::blocks_per_transaction)
{
auto block (blocks_processing.back ());
auto item (blocks_processing.back ());
blocks_processing.pop_back ();
auto hash (block->hash ());
auto process_result (process_receive_one (transaction, block));
completed_a (transaction, process_result, block);
auto hash (item.first->hash ());
auto process_result (process_receive_one (transaction, item.first));
item.second (transaction, process_result, item.first);
switch (process_result.code)
{
case rai::process_result::progress:
{
progress.push_back (std::make_pair (block, process_result));
progress.push_back (std::make_pair (item.first, process_result));
}
case rai::process_result::old:
{
@ -1164,7 +1155,7 @@ void rai::block_processor::process_receive_many (std::vector <std::shared_ptr <r
for (auto i (cached.begin ()), n (cached.end ()); i != n; ++i)
{
node.store.unchecked_del (transaction, hash, **i);
blocks_processing.push_back (std::move (*i));
blocks_processing.push_back (std::make_pair (*i, [] (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>) {}));
}
std::lock_guard <std::mutex> lock (node.gap_cache.mutex);
node.gap_cache.blocks.get <1> ().erase (hash);

View file

@ -441,7 +441,7 @@ public:
void flush ();
void add (std::shared_ptr <rai::block>, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)> = [] (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>) {});
void process_receive_many (std::shared_ptr <rai::block>, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)> = [] (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>) {});
void process_receive_many (std::vector <std::shared_ptr <rai::block>>, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)> = [] (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>) {});
void process_receive_many (std::deque <std::pair <std::shared_ptr <rai::block>, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)>>> &);
rai::process_return process_receive_one (MDB_txn *, std::shared_ptr <rai::block>);
void process_blocks ();
private: