From 4771642ddf2be01082bdfdec2efa3544338fbf4c Mon Sep 17 00:00:00 2001 From: clemahieu Date: Mon, 7 May 2018 22:29:50 +0100 Subject: [PATCH] Rebroadcast blocks via process RPC (#852) * The process RPC was reimplementing block processing but didn't properly rebroadcast blocks it processed. This converts the RPC to just pass the block off to the block processor. The processing result can be retrieved via logging. * Fixing formatting. --- rai/core_test/rpc.cpp | 37 +++++++++++++++++++++- rai/node/node.cpp | 72 ++++++++++++++++++++----------------------- rai/node/node.hpp | 1 + rai/node/rpc.cpp | 13 +------- 4 files changed, 72 insertions(+), 51 deletions(-) diff --git a/rai/core_test/rpc.cpp b/rai/core_test/rpc.cpp index c5bc9787..fe00326c 100644 --- a/rai/core_test/rpc.cpp +++ b/rai/core_test/rpc.cpp @@ -1020,7 +1020,13 @@ TEST (rpc, process_block) system.poll (); } ASSERT_EQ (200, response.status); - ASSERT_EQ (send.hash (), system.nodes[0]->latest (rai::test_genesis_key.pub)); + auto iterations (0); + while (system.nodes[0]->latest (rai::test_genesis_key.pub) != send.hash ()) + { + system.poll (); + ++iterations; + ASSERT_LT (iterations, 200); + } std::string send_hash (response.json.get ("hash")); ASSERT_EQ (send.hash ().to_string (), send_hash); } @@ -1049,6 +1055,35 @@ TEST (rpc, process_block_no_work) ASSERT_FALSE (response.json.get ("error", "").empty ()); } +TEST (rpc, process_republish) +{ + rai::system system (24000, 2); + rai::keypair key; + auto latest (system.nodes[0]->latest (rai::test_genesis_key.pub)); + auto & node1 (*system.nodes[0]); + rai::send_block send (latest, key.pub, 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, node1.generate_work (latest)); + rai::rpc rpc (system.service, node1, rai::rpc_config (true)); + rpc.start (); + boost::property_tree::ptree request; + request.put ("action", "process"); + std::string json; + send.serialize_json (json); + request.put ("block", json); + test_response response (request, rpc, system.service); + while (response.status == 0) + { + system.poll (); + } + ASSERT_EQ (200, response.status); + auto iterations (0); + while (system.nodes[1]->latest (rai::test_genesis_key.pub) != send.hash ()) + { + system.poll (); + ++iterations; + ASSERT_LT (iterations, 200); + } +} + TEST (rpc, keepalive) { rai::system system (24000, 1); diff --git a/rai/node/node.cpp b/rai/node/node.cpp index 547630b3..206d641b 100644 --- a/rai/node/node.cpp +++ b/rai/node/node.cpp @@ -1299,30 +1299,7 @@ void rai::block_processor::process_receive_many (std::unique_lock & } } auto process_result (process_receive_one (transaction, block)); - switch (process_result.code) - { - case rai::process_result::progress: - { - if (node.block_arrival.recent (hash)) - { - node.active.start (block); - } - } - case rai::process_result::old: - { - auto cached (node.store.unchecked_get (transaction, hash)); - for (auto i (cached.begin ()), n (cached.end ()); i != n; ++i) - { - node.store.unchecked_del (transaction, hash, **i); - add (*i); - } - std::lock_guard lock (node.gap_cache.mutex); - node.gap_cache.blocks.get<1> ().erase (hash); - break; - } - default: - break; - } + (void)process_result; lock_a.lock (); } } @@ -1332,6 +1309,7 @@ void rai::block_processor::process_receive_many (std::unique_lock & rai::process_return rai::block_processor::process_receive_one (MDB_txn * transaction_a, std::shared_ptr block_a) { rai::process_return result; + auto hash (block_a->hash ()); result = node.ledger.process (transaction_a, *block_a); switch (result.code) { @@ -1341,15 +1319,20 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac { std::string block; block_a->serialize_json (block); - BOOST_LOG (node.log) << boost::str (boost::format ("Processing block %1%: %2%") % block_a->hash ().to_string () % block); + BOOST_LOG (node.log) << boost::str (boost::format ("Processing block %1%: %2%") % hash.to_string () % block); } + if (node.block_arrival.recent (hash)) + { + node.active.start (block_a); + } + queue_unchecked (transaction_a, hash); break; } case rai::process_result::gap_previous: { if (node.config.logging.ledger_logging ()) { - BOOST_LOG (node.log) << boost::str (boost::format ("Gap previous for: %1%") % block_a->hash ().to_string ()); + BOOST_LOG (node.log) << boost::str (boost::format ("Gap previous for: %1%") % hash.to_string ()); } node.store.unchecked_put (transaction_a, block_a->previous (), block_a); node.gap_cache.add (transaction_a, block_a); @@ -1359,7 +1342,7 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac { if (node.config.logging.ledger_logging ()) { - BOOST_LOG (node.log) << boost::str (boost::format ("Gap source for: %1%") % block_a->hash ().to_string ()); + BOOST_LOG (node.log) << boost::str (boost::format ("Gap source for: %1%") % hash.to_string ()); } node.store.unchecked_put (transaction_a, node.ledger.block_source (transaction_a, *block_a), block_a); node.gap_cache.add (transaction_a, block_a); @@ -1369,7 +1352,7 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac { if (node.config.logging.ledger_logging ()) { - BOOST_LOG (node.log) << boost::str (boost::format ("State blocks are disabled: %1%") % block_a->hash ().to_string ()); + BOOST_LOG (node.log) << boost::str (boost::format ("State blocks are disabled: %1%") % hash.to_string ()); } node.store.unchecked_put (transaction_a, node.ledger.state_block_parse_canary, block_a); node.gap_cache.add (transaction_a, block_a); @@ -1381,13 +1364,14 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac { BOOST_LOG (node.log) << boost::str (boost::format ("Old for: %1%") % block_a->hash ().to_string ()); } + queue_unchecked (transaction_a, hash); break; } case rai::process_result::bad_signature: { if (node.config.logging.ledger_logging ()) { - BOOST_LOG (node.log) << boost::str (boost::format ("Bad signature for: %1%") % block_a->hash ().to_string ()); + BOOST_LOG (node.log) << boost::str (boost::format ("Bad signature for: %1%") % hash.to_string ()); } break; } @@ -1395,7 +1379,7 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac { if (node.config.logging.ledger_logging ()) { - BOOST_LOG (node.log) << boost::str (boost::format ("Negative spend for: %1%") % block_a->hash ().to_string ()); + BOOST_LOG (node.log) << boost::str (boost::format ("Negative spend for: %1%") % hash.to_string ()); } break; } @@ -1403,7 +1387,7 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac { if (node.config.logging.ledger_logging ()) { - BOOST_LOG (node.log) << boost::str (boost::format ("Unreceivable for: %1%") % block_a->hash ().to_string ()); + BOOST_LOG (node.log) << boost::str (boost::format ("Unreceivable for: %1%") % hash.to_string ()); } break; } @@ -1411,20 +1395,20 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac { if (node.config.logging.ledger_logging ()) { - BOOST_LOG (node.log) << boost::str (boost::format ("Not receive from send for: %1%") % block_a->hash ().to_string ()); + BOOST_LOG (node.log) << boost::str (boost::format ("Not receive from send for: %1%") % hash.to_string ()); } break; } case rai::process_result::fork: { - if (!node.block_arrival.recent (block_a->hash ())) + if (!node.block_arrival.recent (hash)) { // Only let the bootstrap attempt know about forked blocks that did not arrive via UDP. node.bootstrap_initiator.process_fork (transaction_a, block_a); } if (node.config.logging.ledger_logging ()) { - BOOST_LOG (node.log) << boost::str (boost::format ("Fork for: %1% root: %2%") % block_a->hash ().to_string () % block_a->root ().to_string ()); + BOOST_LOG (node.log) << boost::str (boost::format ("Fork for: %1% root: %2%") % hash.to_string () % block_a->root ().to_string ()); } break; } @@ -1432,20 +1416,20 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac { if (node.config.logging.ledger_logging ()) { - BOOST_LOG (node.log) << boost::str (boost::format ("Account mismatch for: %1%") % block_a->hash ().to_string ()); + BOOST_LOG (node.log) << boost::str (boost::format ("Account mismatch for: %1%") % hash.to_string ()); } break; } case rai::process_result::opened_burn_account: { - BOOST_LOG (node.log) << boost::str (boost::format ("*** Rejecting open block for burn account ***: %1%") % block_a->hash ().to_string ()); + BOOST_LOG (node.log) << boost::str (boost::format ("*** Rejecting open block for burn account ***: %1%") % hash.to_string ()); break; } case rai::process_result::balance_mismatch: { if (node.config.logging.ledger_logging ()) { - BOOST_LOG (node.log) << boost::str (boost::format ("Balance mismatch for: %1%") % block_a->hash ().to_string ()); + BOOST_LOG (node.log) << boost::str (boost::format ("Balance mismatch for: %1%") % hash.to_string ()); } break; } @@ -1453,7 +1437,7 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac { if (node.config.logging.ledger_logging ()) { - BOOST_LOG (node.log) << boost::str (boost::format ("Block %1% cannot follow predecessor %2%") % block_a->hash ().to_string () % block_a->previous ().to_string ()); + BOOST_LOG (node.log) << boost::str (boost::format ("Block %1% cannot follow predecessor %2%") % hash.to_string () % block_a->previous ().to_string ()); } break; } @@ -1461,6 +1445,18 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac return result; } +void rai::block_processor::queue_unchecked (MDB_txn * transaction_a, rai::block_hash const & hash_a) +{ + auto cached (node.store.unchecked_get (transaction_a, hash_a)); + for (auto i (cached.begin ()), n (cached.end ()); i != n; ++i) + { + node.store.unchecked_del (transaction_a, hash_a, **i); + add (*i); + } + std::lock_guard lock (node.gap_cache.mutex); + node.gap_cache.blocks.get<1> ().erase (hash_a); +} + rai::node::node (rai::node_init & init_a, boost::asio::io_service & service_a, uint16_t peering_port_a, boost::filesystem::path const & application_path_a, rai::alarm & alarm_a, rai::logging const & logging_a, rai::work_pool & work_a) : node (init_a, service_a, application_path_a, alarm_a, rai::node_config (peering_port_a, logging_a), work_a) { diff --git a/rai/node/node.hpp b/rai/node/node.hpp index 24292b8c..9e964e5a 100644 --- a/rai/node/node.hpp +++ b/rai/node/node.hpp @@ -488,6 +488,7 @@ public: rai::process_return process_receive_one (MDB_txn *, std::shared_ptr); private: + void queue_unchecked (MDB_txn *, rai::block_hash const &); void process_receive_many (std::unique_lock &); bool stopped; bool active; diff --git a/rai/node/rpc.cpp b/rai/node/rpc.cpp index 9e46c999..76ab4e4e 100644 --- a/rai/node/rpc.cpp +++ b/rai/node/rpc.cpp @@ -2596,25 +2596,14 @@ void rai::rpc_handler::process () auto hash (block->hash ()); node.block_arrival.add (hash); rai::process_return result; - std::shared_ptr block_a (std::move (block)); { rai::transaction transaction (node.store.environment, nullptr, true); - result = node.block_processor.process_receive_one (transaction, block_a); + result = node.block_processor.process_receive_one (transaction, std::move (block)); } switch (result.code) { case rai::process_result::progress: { - rai::transaction transaction (node.store.environment, nullptr, false); - auto account (node.ledger.account (transaction, hash)); - auto amount (node.ledger.amount (transaction, hash)); - bool is_state_send (false); - if (auto state = dynamic_cast (block_a.get ())) - { - rai::transaction transaction (node.store.environment, nullptr, false); - is_state_send = node.ledger.is_send (transaction, *state); - } - node.observers.blocks (block_a, account, amount, is_state_send); boost::property_tree::ptree response_l; response_l.put ("hash", hash.to_string ()); response (response_l);