Rebroadcast blocks via process RPC (#852)

* The process RPC was reimplementing block processing but didn't properly rebroadcast blocks it processed.  This converts the RPC to just pass the block off to the block processor.  The processing result can be retrieved via logging.

* Fixing formatting.
This commit is contained in:
clemahieu 2018-05-07 22:29:50 +01:00 committed by GitHub
commit 4771642ddf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 72 additions and 51 deletions

View file

@ -1020,7 +1020,13 @@ TEST (rpc, process_block)
system.poll ();
}
ASSERT_EQ (200, response.status);
ASSERT_EQ (send.hash (), system.nodes[0]->latest (rai::test_genesis_key.pub));
auto iterations (0);
while (system.nodes[0]->latest (rai::test_genesis_key.pub) != send.hash ())
{
system.poll ();
++iterations;
ASSERT_LT (iterations, 200);
}
std::string send_hash (response.json.get<std::string> ("hash"));
ASSERT_EQ (send.hash ().to_string (), send_hash);
}
@ -1049,6 +1055,35 @@ TEST (rpc, process_block_no_work)
ASSERT_FALSE (response.json.get<std::string> ("error", "").empty ());
}
TEST (rpc, process_republish)
{
rai::system system (24000, 2);
rai::keypair key;
auto latest (system.nodes[0]->latest (rai::test_genesis_key.pub));
auto & node1 (*system.nodes[0]);
rai::send_block send (latest, key.pub, 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, node1.generate_work (latest));
rai::rpc rpc (system.service, node1, rai::rpc_config (true));
rpc.start ();
boost::property_tree::ptree request;
request.put ("action", "process");
std::string json;
send.serialize_json (json);
request.put ("block", json);
test_response response (request, rpc, system.service);
while (response.status == 0)
{
system.poll ();
}
ASSERT_EQ (200, response.status);
auto iterations (0);
while (system.nodes[1]->latest (rai::test_genesis_key.pub) != send.hash ())
{
system.poll ();
++iterations;
ASSERT_LT (iterations, 200);
}
}
TEST (rpc, keepalive)
{
rai::system system (24000, 1);

View file

@ -1299,30 +1299,7 @@ void rai::block_processor::process_receive_many (std::unique_lock<std::mutex> &
}
}
auto process_result (process_receive_one (transaction, block));
switch (process_result.code)
{
case rai::process_result::progress:
{
if (node.block_arrival.recent (hash))
{
node.active.start (block);
}
}
case rai::process_result::old:
{
auto cached (node.store.unchecked_get (transaction, hash));
for (auto i (cached.begin ()), n (cached.end ()); i != n; ++i)
{
node.store.unchecked_del (transaction, hash, **i);
add (*i);
}
std::lock_guard<std::mutex> lock (node.gap_cache.mutex);
node.gap_cache.blocks.get<1> ().erase (hash);
break;
}
default:
break;
}
(void)process_result;
lock_a.lock ();
}
}
@ -1332,6 +1309,7 @@ void rai::block_processor::process_receive_many (std::unique_lock<std::mutex> &
rai::process_return rai::block_processor::process_receive_one (MDB_txn * transaction_a, std::shared_ptr<rai::block> block_a)
{
rai::process_return result;
auto hash (block_a->hash ());
result = node.ledger.process (transaction_a, *block_a);
switch (result.code)
{
@ -1341,15 +1319,20 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac
{
std::string block;
block_a->serialize_json (block);
BOOST_LOG (node.log) << boost::str (boost::format ("Processing block %1%: %2%") % block_a->hash ().to_string () % block);
BOOST_LOG (node.log) << boost::str (boost::format ("Processing block %1%: %2%") % hash.to_string () % block);
}
if (node.block_arrival.recent (hash))
{
node.active.start (block_a);
}
queue_unchecked (transaction_a, hash);
break;
}
case rai::process_result::gap_previous:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Gap previous for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Gap previous for: %1%") % hash.to_string ());
}
node.store.unchecked_put (transaction_a, block_a->previous (), block_a);
node.gap_cache.add (transaction_a, block_a);
@ -1359,7 +1342,7 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Gap source for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Gap source for: %1%") % hash.to_string ());
}
node.store.unchecked_put (transaction_a, node.ledger.block_source (transaction_a, *block_a), block_a);
node.gap_cache.add (transaction_a, block_a);
@ -1369,7 +1352,7 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("State blocks are disabled: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("State blocks are disabled: %1%") % hash.to_string ());
}
node.store.unchecked_put (transaction_a, node.ledger.state_block_parse_canary, block_a);
node.gap_cache.add (transaction_a, block_a);
@ -1381,13 +1364,14 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac
{
BOOST_LOG (node.log) << boost::str (boost::format ("Old for: %1%") % block_a->hash ().to_string ());
}
queue_unchecked (transaction_a, hash);
break;
}
case rai::process_result::bad_signature:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Bad signature for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Bad signature for: %1%") % hash.to_string ());
}
break;
}
@ -1395,7 +1379,7 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Negative spend for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Negative spend for: %1%") % hash.to_string ());
}
break;
}
@ -1403,7 +1387,7 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Unreceivable for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Unreceivable for: %1%") % hash.to_string ());
}
break;
}
@ -1411,20 +1395,20 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Not receive from send for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Not receive from send for: %1%") % hash.to_string ());
}
break;
}
case rai::process_result::fork:
{
if (!node.block_arrival.recent (block_a->hash ()))
if (!node.block_arrival.recent (hash))
{
// Only let the bootstrap attempt know about forked blocks that did not arrive via UDP.
node.bootstrap_initiator.process_fork (transaction_a, block_a);
}
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Fork for: %1% root: %2%") % block_a->hash ().to_string () % block_a->root ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Fork for: %1% root: %2%") % hash.to_string () % block_a->root ().to_string ());
}
break;
}
@ -1432,20 +1416,20 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Account mismatch for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Account mismatch for: %1%") % hash.to_string ());
}
break;
}
case rai::process_result::opened_burn_account:
{
BOOST_LOG (node.log) << boost::str (boost::format ("*** Rejecting open block for burn account ***: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("*** Rejecting open block for burn account ***: %1%") % hash.to_string ());
break;
}
case rai::process_result::balance_mismatch:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Balance mismatch for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Balance mismatch for: %1%") % hash.to_string ());
}
break;
}
@ -1453,7 +1437,7 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Block %1% cannot follow predecessor %2%") % block_a->hash ().to_string () % block_a->previous ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Block %1% cannot follow predecessor %2%") % hash.to_string () % block_a->previous ().to_string ());
}
break;
}
@ -1461,6 +1445,18 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac
return result;
}
void rai::block_processor::queue_unchecked (MDB_txn * transaction_a, rai::block_hash const & hash_a)
{
auto cached (node.store.unchecked_get (transaction_a, hash_a));
for (auto i (cached.begin ()), n (cached.end ()); i != n; ++i)
{
node.store.unchecked_del (transaction_a, hash_a, **i);
add (*i);
}
std::lock_guard<std::mutex> lock (node.gap_cache.mutex);
node.gap_cache.blocks.get<1> ().erase (hash_a);
}
rai::node::node (rai::node_init & init_a, boost::asio::io_service & service_a, uint16_t peering_port_a, boost::filesystem::path const & application_path_a, rai::alarm & alarm_a, rai::logging const & logging_a, rai::work_pool & work_a) :
node (init_a, service_a, application_path_a, alarm_a, rai::node_config (peering_port_a, logging_a), work_a)
{

View file

@ -488,6 +488,7 @@ public:
rai::process_return process_receive_one (MDB_txn *, std::shared_ptr<rai::block>);
private:
void queue_unchecked (MDB_txn *, rai::block_hash const &);
void process_receive_many (std::unique_lock<std::mutex> &);
bool stopped;
bool active;

View file

@ -2596,25 +2596,14 @@ void rai::rpc_handler::process ()
auto hash (block->hash ());
node.block_arrival.add (hash);
rai::process_return result;
std::shared_ptr<rai::block> block_a (std::move (block));
{
rai::transaction transaction (node.store.environment, nullptr, true);
result = node.block_processor.process_receive_one (transaction, block_a);
result = node.block_processor.process_receive_one (transaction, std::move (block));
}
switch (result.code)
{
case rai::process_result::progress:
{
rai::transaction transaction (node.store.environment, nullptr, false);
auto account (node.ledger.account (transaction, hash));
auto amount (node.ledger.amount (transaction, hash));
bool is_state_send (false);
if (auto state = dynamic_cast<rai::state_block *> (block_a.get ()))
{
rai::transaction transaction (node.store.environment, nullptr, false);
is_state_send = node.ledger.is_send (transaction, *state);
}
node.observers.blocks (block_a, account, amount, is_state_send);
boost::property_tree::ptree response_l;
response_l.put ("hash", hash.to_string ());
response (response_l);