Process wallet blocks outside of block processor queue (#2196)

* Process wallet blocks outside of block processor queue

* return block_processor.process_one(...)
This commit is contained in:
Sergey Kroshnin 2019-08-06 18:52:32 +03:00 committed by Guilherme Lawless
commit 1a946f5545
6 changed files with 48 additions and 21 deletions

View file

@ -92,6 +92,12 @@ void nano::block_processor::force (std::shared_ptr<nano::block> block_a)
condition.notify_all ();
}
void nano::block_processor::wait_write ()
{
std::lock_guard<std::mutex> lock (mutex);
awaiting_write = true;
}
void nano::block_processor::process_blocks ()
{
std::unique_lock<std::mutex> lock (mutex);
@ -249,7 +255,7 @@ void nano::block_processor::process_batch (std::unique_lock<std::mutex> & lock_a
// Processing blocks
auto first_time (true);
unsigned number_of_blocks_processed (0), number_of_forced_processed (0);
while ((!blocks.empty () || !forced.empty ()) && (timer_l.before_deadline (node.config.block_processor_batch_max_time) || (number_of_blocks_processed < node.flags.block_processor_batch_size)))
while ((!blocks.empty () || !forced.empty ()) && (timer_l.before_deadline (node.config.block_processor_batch_max_time) || (number_of_blocks_processed < node.flags.block_processor_batch_size)) && !awaiting_write)
{
auto log_this_record (false);
if (node.config.logging.timing_logging ())
@ -339,6 +345,7 @@ void nano::block_processor::process_batch (std::unique_lock<std::mutex> & lock_a
verify_state_blocks (transaction, lock_a, 256 * (node.config.signature_checker_threads + 1));
}
}
awaiting_write = false;
lock_a.unlock ();
if (node.config.logging.timing_logging () && number_of_blocks_processed != 0)

View file

@ -41,6 +41,7 @@ public:
void add (nano::unchecked_info const &);
void add (std::shared_ptr<nano::block>, uint64_t = 0);
void force (std::shared_ptr<nano::block>);
void wait_write ();
bool should_log (bool);
bool have_blocks ();
void process_blocks ();
@ -57,6 +58,7 @@ private:
void process_live (nano::block_hash const &, std::shared_ptr<nano::block>);
bool stopped;
bool active;
bool awaiting_write{ false };
std::chrono::steady_clock::time_point next_log;
std::deque<nano::unchecked_info> state_blocks;
std::deque<nano::unchecked_info> blocks;

View file

@ -2935,20 +2935,12 @@ void nano::json_handler::process ()
{
if (!nano::work_validate (*block))
{
auto hash (block->hash ());
node.block_arrival.add (hash);
nano::process_return result;
{
auto transaction (node.store.tx_begin_write ());
// Set current time to trigger automatic rebroadcast and election
nano::unchecked_info info (block, block->account (), nano::seconds_since_epoch (), nano::signature_verification::unknown);
result = node.block_processor.process_one (transaction, info);
}
auto result (node.process_local (block));
switch (result.code)
{
case nano::process_result::progress:
{
response_l.put ("hash", hash.to_string ());
response_l.put ("hash", block->hash ().to_string ());
break;
}
case nano::process_result::gap_previous:
@ -2999,7 +2991,7 @@ void nano::json_handler::process ()
{
node.active.erase (*block);
node.block_processor.force (block);
response_l.put ("hash", hash.to_string ());
response_l.put ("hash", block->hash ().to_string ());
}
else
{

View file

@ -608,6 +608,19 @@ nano::process_return nano::node::process (nano::block const & block_a)
return result;
}
nano::process_return nano::node::process_local (std::shared_ptr<nano::block> block_a)
{
// Add block hash as recently arrived to trigger automatic rebroadcast and election
block_arrival.add (block_a->hash ());
// Set current time to trigger automatic rebroadcast and election
nano::unchecked_info info (block_a, block_a->account (), nano::seconds_since_epoch (), nano::signature_verification::unknown);
// Notify block processor to release write lock
block_processor.wait_write ();
// Process block
auto transaction (store.tx_begin_write ());
return block_processor.process_one (transaction, info);
}
void nano::node::start ()
{
network.start ();

View file

@ -105,6 +105,7 @@ public:
void process_confirmed (nano::election_status const &, uint8_t = 0);
void process_active (std::shared_ptr<nano::block>);
nano::process_return process (nano::block const &);
nano::process_return process_local (std::shared_ptr<nano::block>);
void keepalive_preconfigured (std::vector<std::string> const &);
nano::block_hash latest (nano::account const &);
nano::uint128_t balance (nano::account const &);

View file

@ -981,12 +981,16 @@ std::shared_ptr<nano::block> nano::wallet::receive_action (nano::block const & s
wallets.node.work_generate_blocking (*block, wallets.node.active.active_difficulty ());
}
wallets.watcher.add (block);
wallets.node.process_active (block);
wallets.node.block_processor.flush ();
if (generate_work_a)
bool error (wallets.node.process_local (block).code != nano::process_result::progress);
if (!error && generate_work_a)
{
work_ensure (account, block->hash ());
}
// Return null block after ledger process error
if (error)
{
block = nullptr;
}
}
return block;
}
@ -1026,12 +1030,16 @@ std::shared_ptr<nano::block> nano::wallet::change_action (nano::account const &
wallets.node.work_generate_blocking (*block, wallets.node.active.active_difficulty ());
}
wallets.watcher.add (block);
wallets.node.process_active (block);
wallets.node.block_processor.flush ();
if (generate_work_a)
bool error (wallets.node.process_local (block).code != nano::process_result::progress);
if (!error && generate_work_a)
{
work_ensure (source_a, block->hash ());
}
// Return null block after ledger process error
if (error)
{
block = nullptr;
}
}
return block;
}
@ -1136,12 +1144,16 @@ std::shared_ptr<nano::block> nano::wallet::send_action (nano::account const & so
wallets.node.work_generate_blocking (*block, wallets.node.active.active_difficulty ());
}
wallets.watcher.add (block);
wallets.node.process_active (block);
wallets.node.block_processor.flush ();
if (generate_work_a)
error = (wallets.node.process_local (block).code != nano::process_result::progress);
if (!error && generate_work_a)
{
work_ensure (source_a, block->hash ());
}
// Return null block after ledger process error
if (error)
{
block = nullptr;
}
}
return block;
}