diff --git a/rai/core_test/network.cpp b/rai/core_test/network.cpp index 37e8cf65..669275dc 100644 --- a/rai/core_test/network.cpp +++ b/rai/core_test/network.cpp @@ -194,7 +194,7 @@ TEST (network, send_valid_confirm_ack) rai::block_hash latest1 (system.nodes [0]->latest (rai::test_genesis_key.pub)); rai::send_block block2 (latest1, key2.pub, 50, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (latest1)); rai::block_hash latest2 (system.nodes [1]->latest (rai::test_genesis_key.pub)); - system.nodes [0]->process_receive_republish (std::unique_ptr (new rai::send_block (block2))); + system.nodes [0]->process_active (std::unique_ptr (new rai::send_block (block2))); auto iterations (0); // Keep polling until latest block changes while (system.nodes [1]->latest (rai::test_genesis_key.pub) == latest2) @@ -217,7 +217,7 @@ TEST (network, send_valid_publish) rai::send_block block2 (latest1, key2.pub, 50, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (latest1)); auto hash2 (block2.hash ()); rai::block_hash latest2 (system.nodes [1]->latest (rai::test_genesis_key.pub)); - system.nodes [1]->process_receive_republish (std::unique_ptr (new rai::send_block (block2))); + system.nodes [1]->process_active (std::unique_ptr (new rai::send_block (block2))); auto iterations (0); while (system.nodes [0]->network.incoming.publish == 0) { @@ -302,8 +302,8 @@ TEST (receivable_processor, send_with_receive) ASSERT_EQ (0, system.nodes [0]->balance (key2.pub)); ASSERT_EQ (amount, system.nodes [1]->balance (rai::test_genesis_key.pub)); ASSERT_EQ (0, system.nodes [1]->balance (key2.pub)); - system.nodes [0]->process_receive_republish (block1); - system.nodes [1]->process_receive_republish (block1); + system.nodes [0]->process_active (block1); + system.nodes [1]->process_active (block1); ASSERT_EQ (amount - system.nodes [0]->config.receive_minimum.number (), system.nodes [0]->balance (rai::test_genesis_key.pub)); ASSERT_EQ (0, system.nodes [0]->balance (key2.pub)); ASSERT_EQ (amount - system.nodes [0]->config.receive_minimum.number (), system.nodes [1]->balance (rai::test_genesis_key.pub)); @@ -515,6 +515,7 @@ TEST (bootstrap_processor, process_one) ++iterations; ASSERT_LT (iterations, 200); } + ASSERT_EQ (0, node1->active.roots.size ()); node1->stop (); } diff --git a/rai/core_test/node.cpp b/rai/core_test/node.cpp index 44ebddad..1c618912 100644 --- a/rai/core_test/node.cpp +++ b/rai/core_test/node.cpp @@ -163,8 +163,8 @@ TEST (node, send_out_of_order) rai::genesis genesis; rai::send_block send1 (genesis.hash (), key2.pub, std::numeric_limits ::max () - system.nodes [0]->config.receive_minimum.number (), rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ())); rai::send_block send2 (send1.hash (), key2.pub, std::numeric_limits ::max () - system.nodes [0]->config.receive_minimum.number () * 2, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (send1.hash ())); - system.nodes [0]->process_receive_republish (std::unique_ptr (new rai::send_block (send2))); - system.nodes [0]->process_receive_republish (std::unique_ptr (new rai::send_block (send1))); + system.nodes [0]->process_active (std::unique_ptr (new rai::send_block (send2))); + system.nodes [0]->process_active (std::unique_ptr (new rai::send_block (send1))); auto iterations (0); while (std::any_of (system.nodes.begin (), system.nodes.end (), [&] (std::shared_ptr const & node_a) {return node_a->balance (rai::test_genesis_key.pub) != rai::genesis_amount - system.nodes [0]->config.receive_minimum.number () * 2;})) { @@ -181,7 +181,7 @@ TEST (node, quick_confirm) rai::block_hash previous (system.nodes [0]->latest (rai::test_genesis_key.pub)); system.wallet (0)->insert_adhoc (key.prv); auto send (std::make_shared (previous, key.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (previous))); - system.nodes [0]->process_receive_republish (send); + system.nodes [0]->process_active (send); auto iterations (0); while (system.nodes [0]->balance (key.pub).is_zero ()) { @@ -707,13 +707,13 @@ TEST (node, fork_publish) auto send1 (std::make_shared (genesis.hash (), key1.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0)); rai::keypair key2; auto send2 (std::make_shared (genesis.hash (), key2.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0)); - node1.process_receive_republish (send1); + node1.process_active (send1); ASSERT_EQ (1, node1.active.roots.size ()); auto existing (node1.active.roots.find (send1->root ())); ASSERT_NE (node1.active.roots.end (), existing); auto election (existing->election); ASSERT_EQ (2, election->votes.rep_votes.size ()); - node1.process_receive_republish (send2); + node1.process_active (send2); auto existing1 (election->votes.rep_votes.find (rai::test_genesis_key.pub)); ASSERT_NE (election->votes.rep_votes.end (), existing1); ASSERT_EQ (*send1, *existing1->second); @@ -738,12 +738,12 @@ TEST (node, fork_keep) // send1 and send2 fork to different accounts auto send1 (std::make_shared (genesis.hash (), key1.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ()))); auto send2 (std::make_shared (genesis.hash (), key2.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ()))); - node1.process_receive_republish (send1); - node2.process_receive_republish (send1); + node1.process_active (send1); + node2.process_active (send1); ASSERT_EQ (1, node1.active.roots.size ()); ASSERT_EQ (1, node2.active.roots.size ()); - node1.process_receive_republish (send2); - node2.process_receive_republish (send2); + node1.process_active (send2); + node2.process_active (send2); auto conflict (node2.active.roots.find (genesis.hash ())); ASSERT_NE (node2.active.roots.end (), conflict); auto votes1 (conflict->election); @@ -960,22 +960,22 @@ TEST (node, fork_open_flip) rai::keypair rep1; rai::keypair rep2; auto send1 (std::make_shared (genesis.hash (), key1.pub, rai::genesis_amount - 1, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ()))); - node1.process_receive_republish (send1); - node2.process_receive_republish (send1); + node1.process_active (send1); + node2.process_active (send1); // We should be keeping this block auto open1 (std::make_shared (send1->hash (), rep1.pub, key1.pub, key1.prv, key1.pub, system.work.generate (key1.pub))); // This block should be evicted auto open2 (std::make_shared (send1->hash (), rep2.pub, key1.pub, key1.prv, key1.pub, system.work.generate (key1.pub))); ASSERT_FALSE (*open1 == *open2); // node1 gets copy that will remain - node1.process_receive_republish (open1); + node1.process_active (open1); // node2 gets copy that will be evicted - node2.process_receive_republish (open2); + node2.process_active (open2); ASSERT_EQ (2, node1.active.roots.size ()); ASSERT_EQ (2, node2.active.roots.size ()); // Notify both nodes that a fork exists - node1.process_receive_republish (open2); - node2.process_receive_republish (open1); + node1.process_active (open2); + node2.process_active (open1); auto conflict (node2.active.roots.find (open1->root ())); ASSERT_NE (node2.active.roots.end (), conflict); auto votes1 (conflict->election); @@ -1112,12 +1112,12 @@ TEST (node, broadcast_elected) system.wallet (2)->insert_adhoc (rep_other.prv); auto fork0 (std::make_shared (node2->latest (rai::test_genesis_key.pub), rep_small.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0)); node0->generate_work (*fork0); - node0->process_receive_republish (fork0); - node1->process_receive_republish (fork0); + node0->process_active (fork0); + node1->process_active (fork0); auto fork1 (std::make_shared (node2->latest (rai::test_genesis_key.pub), rep_big.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0)); node0->generate_work (*fork1); system.wallet (2)->insert_adhoc (rep_small.prv); - node2->process_receive_republish (fork1); + node2->process_active (fork1); //std::cerr << "fork0: " << fork_hash.to_string () << std::endl; //std::cerr << "fork1: " << fork1.hash ().to_string () << std::endl; auto iterations (0); diff --git a/rai/core_test/work_pool.cpp b/rai/core_test/work_pool.cpp index e99c4a4d..aef1624b 100644 --- a/rai/core_test/work_pool.cpp +++ b/rai/core_test/work_pool.cpp @@ -52,7 +52,7 @@ TEST (work, cancel_many) pool.cancel (key1); } -TEST (work, opencl) +TEST (work, DISABLED_opencl) { rai::logging logging; logging.init (rai::unique_path ()); diff --git a/rai/node/bootstrap.cpp b/rai/node/bootstrap.cpp index 9631632e..55065adb 100755 --- a/rai/node/bootstrap.cpp +++ b/rai/node/bootstrap.cpp @@ -1548,7 +1548,7 @@ void rai::bulk_push_server::received_block (boost::system::error_code const & ec { if (!connection->node->bootstrap_initiator.in_progress ()) { - connection->node->process_receive_republish (std::move (block)); + connection->node->process_active (std::move (block)); } receive (); } diff --git a/rai/node/node.cpp b/rai/node/node.cpp index 00f0fa27..f66dd060 100755 --- a/rai/node/node.cpp +++ b/rai/node/node.cpp @@ -358,8 +358,7 @@ public: ++node.network.incoming.publish; node.peers.contacted (sender, message_a.version_using); node.peers.insert (sender, message_a.version_using); - node.block_arrival.add (message_a.block->hash ()); - node.process_receive_republish (message_a.block); + node.process_active (message_a.block); } void confirm_req (rai::confirm_req const & message_a) override { @@ -370,8 +369,7 @@ public: ++node.network.incoming.confirm_req; node.peers.contacted (sender, message_a.version_using); node.peers.insert (sender, message_a.version_using); - node.block_arrival.add (message_a.block->hash ()); - node.process_receive_republish (message_a.block); + node.process_active (message_a.block); rai::transaction transaction_a (node.store.environment, nullptr, false); if (node.store.block_exists (transaction_a, message_a.block->hash ())) { @@ -387,8 +385,7 @@ public: ++node.network.incoming.confirm_ack; node.peers.contacted (sender, message_a.version_using); node.peers.insert (sender, message_a.version_using); - node.block_arrival.add (message_a.vote.block->hash ()); - node.process_receive_republish (message_a.vote.block); + node.process_active (message_a.vote.block); node.vote_processor.vote (message_a.vote, sender); } void bulk_pull (rai::bulk_pull const &) override @@ -1317,6 +1314,14 @@ block_processor (*this) observers.disconnect (); }; observers.blocks.add ([this] (std::shared_ptr block_a, rai::account const & account_a, rai::amount const & amount_a) + { + if (this->block_arrival.recent (block_a->hash ())) + { + rai::transaction transaction (store.environment, nullptr, true); + active.start (transaction, block_a); + } + }); + observers.blocks.add ([this] (std::shared_ptr block_a, rai::account const & account_a, rai::amount const & amount_a) { if (this->block_arrival.recent (block_a->hash ())) { @@ -1588,29 +1593,14 @@ void rai::network::confirm_send (rai::confirm_ack const & confirm_a, std::shared }); } -void rai::node::process_receive_republish (std::shared_ptr incoming) +void rai::node::process_active (std::shared_ptr incoming) { - assert (incoming != nullptr); - auto node_l (shared_from_this ()); - block_processor.add (incoming, [node_l] (MDB_txn * transaction_a, rai::process_return result_a, std::shared_ptr block_a) - { - switch (result_a.code) - { - case rai::process_result::progress: - { - node_l->active.start (transaction_a, block_a); - break; - } - default: - { - break; - } - } - }); - if (rai::rai_network == rai::rai_networks::rai_test_network) - { - block_processor.flush (); - } + block_arrival.add (incoming->hash ()); + block_processor.process_receive_many (incoming); + if (rai::rai_network == rai::rai_networks::rai_test_network) + { + block_processor.flush (); + } } rai::process_return rai::node::process (rai::block const & block_a) @@ -2258,15 +2248,16 @@ void rai::block_arrival::add (rai::block_hash const & hash_a) std::lock_guard lock (mutex); auto now (std::chrono::system_clock::now ()); arrival.insert (rai::block_arrival_info {now, hash_a}); - while (!arrival.empty () && arrival.begin ()->arrival + std::chrono::seconds (60) < now) - { - arrival.erase (arrival.begin ()); - } } bool rai::block_arrival::recent (rai::block_hash const & hash_a) { - std::lock_guard lock (mutex); + std::lock_guard lock (mutex); + auto now (std::chrono::system_clock::now ()); + while (!arrival.empty () && arrival.begin ()->arrival + std::chrono::seconds (60) < now) + { + arrival.erase (arrival.begin ()); + } return arrival.get <1> ().find (hash_a) != arrival.get <1> ().end (); } diff --git a/rai/node/node.hpp b/rai/node/node.hpp index ee9824a2..da1050af 100644 --- a/rai/node/node.hpp +++ b/rai/node/node.hpp @@ -439,8 +439,8 @@ public: ~block_processor (); void stop (); void flush (); - void add (std::shared_ptr , std::function )> = [] (MDB_txn *, rai::process_return, std::shared_ptr ) {}); - void process_receive_many (std::shared_ptr , std::function )> = [] (MDB_txn *, rai::process_return, std::shared_ptr ) {}); + void add (std::shared_ptr , std::function )> = [] (MDB_txn *, rai::process_return, std::shared_ptr ) {}); + void process_receive_many (std::shared_ptr , std::function )> = [] (MDB_txn *, rai::process_return, std::shared_ptr ) {}); rai::process_return process_receive_one (MDB_txn *, std::shared_ptr ); private: void process_blocks (); @@ -471,7 +471,7 @@ public: int store_version (); void process_confirmed (std::shared_ptr ); void process_message (rai::message &, rai::endpoint const &); - void process_receive_republish (std::shared_ptr ); + void process_active (std::shared_ptr ); rai::process_return process (rai::block const &); void keepalive_preconfigured (std::vector const &); rai::block_hash latest (rai::account const &); diff --git a/rai/node/rpc.cpp b/rai/node/rpc.cpp index 97130b16..e90b659a 100755 --- a/rai/node/rpc.cpp +++ b/rai/node/rpc.cpp @@ -1788,7 +1788,7 @@ void rai::rpc_handler::process () { if (!node.work.work_validate (*block)) { - node.process_receive_republish (std::move (block)); + node.process_active (std::move (block)); boost::property_tree::ptree response_l; response (response_l); } diff --git a/rai/node/wallet.cpp b/rai/node/wallet.cpp index 9c3dc39c..88665ec6 100644 --- a/rai/node/wallet.cpp +++ b/rai/node/wallet.cpp @@ -1035,7 +1035,7 @@ std::shared_ptr rai::wallet::receive_action (rai::send_block const if (block != nullptr) { assert (block != nullptr); - node.process_receive_republish (block); + node.process_active (block); auto hash (block->hash ()); auto this_l (shared_from_this ()); auto source (send_a.hashables.destination); @@ -1073,7 +1073,7 @@ std::shared_ptr rai::wallet::change_action (rai::account const & so if (block != nullptr) { assert (block != nullptr); - node.process_receive_republish (block); + node.process_active (block); auto hash (block->hash ()); auto this_l (shared_from_this ()); node.wallets.queue_wallet_action (source_a, rai::wallets::generate_priority, [this_l, source_a, hash] @@ -1114,7 +1114,7 @@ std::shared_ptr rai::wallet::send_action (rai::account const & sour if (block != nullptr) { assert (block != nullptr); - node.process_receive_republish (block); + node.process_active (block); auto hash (block->hash ()); auto this_l (shared_from_this ()); node.wallets.queue_wallet_action (source_a, rai::wallets::generate_priority, [this_l, source_a, hash] diff --git a/rai/qt/qt.cpp b/rai/qt/qt.cpp index 63c732e5..cae31214 100755 --- a/rai/qt/qt.cpp +++ b/rai/qt/qt.cpp @@ -1673,7 +1673,7 @@ wallet (wallet_a) { show_label_ok (*status); this->status->setText (""); - this->wallet.node.process_receive_republish (std::move (block_l)); + this->wallet.node.process_active (std::move (block_l)); } else {