Requeueing pull inside bootstrap block_processor lambda because that doesn't require extra functions or locking on bootstrap_initiator. This also has less complicated locking.

Logging when we're flushing blocks.
This commit is contained in:
clemahieu 2017-12-25 16:46:46 -06:00
commit 91816197ca
3 changed files with 14 additions and 25 deletions

View file

@ -592,7 +592,8 @@ void rai::bulk_pull_client::received_block (boost::system::error_code const & ec
expected = block->previous ();
}
auto attempt_l (connection->attempt);
attempt_l->node->block_processor.add (rai::block_processor_item (block, [attempt_l] (MDB_txn * transaction_a, rai::process_return result_a, std::shared_ptr <rai::block> block_a)
auto pull_l (pull);
attempt_l->node->block_processor.add (rai::block_processor_item (block, [attempt_l, pull_l] (MDB_txn * transaction_a, rai::process_return result_a, std::shared_ptr <rai::block> block_a)
{
switch (result_a.code)
{
@ -606,6 +607,8 @@ void rai::bulk_pull_client::received_block (boost::system::error_code const & ec
node_l->active.start (transaction_a, block);
node_l->network.broadcast_confirm_req (block_a);
node_l->network.broadcast_confirm_req (block);
auto hash (block_a->hash ());
attempt_l->requeue_pull (rai::pull_info (pull_l.account, hash, hash));
BOOST_LOG (node_l->log) << boost::str (boost::format ("Fork received in bootstrap between: %1% and %2% root %3%") % block_a->hash ().to_string () % block->hash ().to_string () % block_a->root ().to_string ());
break;
}
@ -891,7 +894,9 @@ void rai::bootstrap_attempt::run ()
}
}
// Flushing may resolve forks which can add more pulls
BOOST_LOG (node->log) << "Flushing unchecked blocks";
node->block_processor.flush ();
BOOST_LOG (node->log) << "Finished flushing unchecked blocks";
}
if (!stopped)
{
@ -1041,7 +1046,7 @@ stopped (false)
rai::bootstrap_initiator::~bootstrap_initiator ()
{
stop ();
stop ();
}
void rai::bootstrap_initiator::bootstrap ()
@ -1049,26 +1054,26 @@ void rai::bootstrap_initiator::bootstrap ()
std::unique_lock <std::mutex> lock (mutex);
if (!stopped && attempt == nullptr)
{
stop_attempt (lock);
attempt = std::make_shared <rai::bootstrap_attempt> (node.shared ());
stop_attempt (lock);
attempt = std::make_shared <rai::bootstrap_attempt> (node.shared ());
attempt_thread.reset (new std::thread ([this] ()
{
attempt->run ();
{
attempt->run ();
std::lock_guard <std::mutex> lock (mutex);
attempt.reset ();
condition.notify_all ();
}));
}));
}
}
void rai::bootstrap_initiator::bootstrap (rai::endpoint const & endpoint_a)
{
node.peers.insert (endpoint_a, 0x5);
node.peers.insert (endpoint_a, 0x5);
bootstrap ();
std::lock_guard <std::mutex> lock (mutex);
if (attempt != nullptr)
{
attempt->add_connection (endpoint_a);
attempt->add_connection (endpoint_a);
}
}
@ -1078,20 +1083,6 @@ void rai::bootstrap_initiator::add_observer (std::function <void (bool)> const &
observers.push_back (observer_a);
}
void rai::bootstrap_initiator::request_account (rai::account const & account_a, rai::block_hash const & hash_a)
{
std::shared_ptr <rai::bootstrap_attempt> attempt_l;
{
std::lock_guard <std::mutex> lock (mutex);
attempt_l = attempt;
}
if (attempt_l != nullptr)
{
std::lock_guard <std::mutex> lock (mutex);
attempt_l->pulls.push_back (rai::pull_info (account_a, hash_a, hash_a));
}
}
bool rai::bootstrap_initiator::in_progress ()
{
std::lock_guard <std::mutex> lock (mutex);

View file

@ -163,7 +163,6 @@ public:
void bootstrap ();
void notify_listeners (bool);
void add_observer (std::function <void (bool)> const &);
void request_account (rai::account const &, rai::block_hash const &);
bool in_progress ();
void stop ();
rai::node & node;

View file

@ -1170,7 +1170,6 @@ void rai::block_processor::process_receive_many (std::deque <rai::block_processo
{
// Replace our block with the winner and roll back any dependent blocks
BOOST_LOG (node.log) << boost::str (boost::format ("Rolling back %1% and replacing with %2%") % successor->hash ().to_string () % hash.to_string ());
node.bootstrap_initiator.request_account (node.ledger.account (transaction, successor->hash ()), hash);
node.ledger.rollback (transaction, successor->hash ());
}
}