Apply bootstrap fork handling uniformly

Blocks that go into unchecked lose the callback
that triggers the fork handling. This patch triggers
it directly, instead of relying on the callback.
This commit is contained in:
Luke Alonso 2018-01-21 07:43:18 -08:00 committed by androm3da
commit e0e72fcf4d
4 changed files with 25 additions and 45 deletions

View file

@ -597,32 +597,7 @@ void rai::bulk_pull_client::received_block (boost::system::error_code const & ec
{
expected = block->previous ();
}
auto attempt_l (connection->attempt);
auto pull_l (pull);
attempt_l->node->block_processor.add (rai::block_processor_item (block, [attempt_l, pull_l](MDB_txn * transaction_a, rai::process_return result_a, std::shared_ptr<rai::block> block_a) {
switch (result_a.code)
{
case rai::process_result::progress:
case rai::process_result::old:
break;
case rai::process_result::fork:
{
auto node_l (attempt_l->node);
std::shared_ptr<rai::block> block (node_l->ledger.forked_block (transaction_a, *block_a));
if (!node_l->active.start (transaction_a, block))
{
node_l->network.broadcast_confirm_req (block_a);
node_l->network.broadcast_confirm_req (block);
auto hash (block_a->hash ());
attempt_l->requeue_pull (rai::pull_info (pull_l.account, hash, hash));
BOOST_LOG (node_l->log) << boost::str (boost::format ("While bootstrappping, fork between our block: %2% and block %1% both with root %3%") % block_a->hash ().to_string () % block->hash ().to_string () % block_a->root ().to_string ());
}
break;
}
default:
break;
}
}));
connection->attempt->node->block_processor.add (rai::block_processor_item (block));
receive_block ();
}
else
@ -946,6 +921,17 @@ bool rai::bootstrap_attempt::consume_future (std::future<bool> & future_a)
return result;
}
void rai::bootstrap_attempt::process_fork (MDB_txn * transaction_a, std::shared_ptr<rai::block> block_a)
{
std::shared_ptr<rai::block> ledger_block (node->ledger.forked_block (transaction_a, *block_a));
if (!node->active.start (transaction_a, ledger_block))
{
node->network.broadcast_confirm_req (ledger_block);
node->network.broadcast_confirm_req (block_a);
BOOST_LOG (node->log) << boost::str (boost::format ("While bootstrappping, fork between our block: %2% and block %1% both with root %3%") % ledger_block->hash ().to_string () % block_a->hash ().to_string () % block_a->root ().to_string ());
}
}
void rai::bootstrap_attempt::populate_connections ()
{
if (connections < node->config.bootstrap_connections)
@ -1146,6 +1132,15 @@ void rai::bootstrap_initiator::notify_listeners (bool in_progress_a)
}
}
void rai::bootstrap_initiator::process_fork (MDB_txn * transaction, std::shared_ptr<rai::block> block_a)
{
std::unique_lock<std::mutex> lock (mutex);
if (attempt != nullptr)
{
attempt->process_fork (transaction, block_a);
}
}
rai::bootstrap_listener::bootstrap_listener (boost::asio::io_service & service_a, uint16_t port_a, rai::node & node_a) :
acceptor (service_a),
local (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::any (), port_a)),

View file

@ -86,6 +86,7 @@ public:
void stop ();
void requeue_pull (rai::pull_info const &);
bool still_pulling ();
void process_fork (MDB_txn *, std::shared_ptr<rai::block>);
std::deque<std::weak_ptr<rai::bootstrap_client>> clients;
std::weak_ptr<rai::bootstrap_client> connection_frontier_request;
std::weak_ptr<rai::frontier_req_client> frontiers;
@ -175,6 +176,7 @@ public:
void notify_listeners (bool);
void add_observer (std::function<void(bool)> const &);
bool in_progress ();
void process_fork (MDB_txn *, std::shared_ptr<rai::block>);
void stop ();
rai::node & node;
std::shared_ptr<rai::bootstrap_attempt> attempt;

View file

@ -1071,23 +1071,12 @@ bool rai::rep_crawler::exists (rai::block_hash const & hash_a)
}
rai::block_processor_item::block_processor_item (std::shared_ptr<rai::block> block_a) :
block_processor_item (block_a, nullptr, false)
block_processor_item (block_a, false)
{
}
rai::block_processor_item::block_processor_item (std::shared_ptr<rai::block> block_a, bool force_a) :
block_processor_item (block_a, nullptr, force_a)
{
}
rai::block_processor_item::block_processor_item (std::shared_ptr<rai::block> block_a, std::function<void(MDB_txn *, rai::process_return, std::shared_ptr<rai::block>)> callback_a) :
block_processor_item (block_a, callback_a, false)
{
}
rai::block_processor_item::block_processor_item (std::shared_ptr<rai::block> block_a, std::function<void(MDB_txn *, rai::process_return, std::shared_ptr<rai::block>)> callback_a, bool force_a) :
block (block_a),
callback (callback_a),
force (force_a)
{
}
@ -1183,10 +1172,6 @@ void rai::block_processor::process_receive_many (std::deque<rai::block_processor
}
}
auto process_result (process_receive_one (transaction, item.block));
if (item.callback)
{
item.callback (transaction, process_result, item.block);
}
switch (process_result.code)
{
case rai::process_result::progress:
@ -1320,6 +1305,7 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac
}
case rai::process_result::fork:
{
node.bootstrap_initiator.process_fork (transaction_a, block_a);
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Fork for: %1% root: %2%") % block_a->hash ().to_string () % block_a->root ().to_string ());

View file

@ -441,10 +441,7 @@ class block_processor_item
public:
block_processor_item (std::shared_ptr<rai::block>);
block_processor_item (std::shared_ptr<rai::block>, bool);
block_processor_item (std::shared_ptr<rai::block>, std::function<void(MDB_txn *, rai::process_return, std::shared_ptr<rai::block>)>);
block_processor_item (std::shared_ptr<rai::block>, std::function<void(MDB_txn *, rai::process_return, std::shared_ptr<rai::block>)>, bool);
std::shared_ptr<rai::block> block;
std::function<void(MDB_txn *, rai::process_return, std::shared_ptr<rai::block>)> callback;
bool force;
};
// Processing blocks is a potentially long IO operation