Using block_arrival class to track and filter blocks that are probably live from blocks that arrive from bootstrapping. This allows a more straight-forward way to choose to republish blocks only if they're live.

This commit is contained in:
clemahieu 2017-09-16 18:05:14 -05:00
commit 2a50b2e10f
9 changed files with 57 additions and 65 deletions

View file

@ -194,7 +194,7 @@ TEST (network, send_valid_confirm_ack)
rai::block_hash latest1 (system.nodes [0]->latest (rai::test_genesis_key.pub));
rai::send_block block2 (latest1, key2.pub, 50, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (latest1));
rai::block_hash latest2 (system.nodes [1]->latest (rai::test_genesis_key.pub));
system.nodes [0]->process_receive_republish (std::unique_ptr <rai::block> (new rai::send_block (block2)));
system.nodes [0]->process_active (std::unique_ptr <rai::block> (new rai::send_block (block2)));
auto iterations (0);
// Keep polling until latest block changes
while (system.nodes [1]->latest (rai::test_genesis_key.pub) == latest2)
@ -217,7 +217,7 @@ TEST (network, send_valid_publish)
rai::send_block block2 (latest1, key2.pub, 50, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (latest1));
auto hash2 (block2.hash ());
rai::block_hash latest2 (system.nodes [1]->latest (rai::test_genesis_key.pub));
system.nodes [1]->process_receive_republish (std::unique_ptr <rai::block> (new rai::send_block (block2)));
system.nodes [1]->process_active (std::unique_ptr <rai::block> (new rai::send_block (block2)));
auto iterations (0);
while (system.nodes [0]->network.incoming.publish == 0)
{
@ -302,8 +302,8 @@ TEST (receivable_processor, send_with_receive)
ASSERT_EQ (0, system.nodes [0]->balance (key2.pub));
ASSERT_EQ (amount, system.nodes [1]->balance (rai::test_genesis_key.pub));
ASSERT_EQ (0, system.nodes [1]->balance (key2.pub));
system.nodes [0]->process_receive_republish (block1);
system.nodes [1]->process_receive_republish (block1);
system.nodes [0]->process_active (block1);
system.nodes [1]->process_active (block1);
ASSERT_EQ (amount - system.nodes [0]->config.receive_minimum.number (), system.nodes [0]->balance (rai::test_genesis_key.pub));
ASSERT_EQ (0, system.nodes [0]->balance (key2.pub));
ASSERT_EQ (amount - system.nodes [0]->config.receive_minimum.number (), system.nodes [1]->balance (rai::test_genesis_key.pub));
@ -515,6 +515,7 @@ TEST (bootstrap_processor, process_one)
++iterations;
ASSERT_LT (iterations, 200);
}
ASSERT_EQ (0, node1->active.roots.size ());
node1->stop ();
}

View file

@ -163,8 +163,8 @@ TEST (node, send_out_of_order)
rai::genesis genesis;
rai::send_block send1 (genesis.hash (), key2.pub, std::numeric_limits <rai::uint128_t>::max () - system.nodes [0]->config.receive_minimum.number (), rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ()));
rai::send_block send2 (send1.hash (), key2.pub, std::numeric_limits <rai::uint128_t>::max () - system.nodes [0]->config.receive_minimum.number () * 2, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (send1.hash ()));
system.nodes [0]->process_receive_republish (std::unique_ptr <rai::block> (new rai::send_block (send2)));
system.nodes [0]->process_receive_republish (std::unique_ptr <rai::block> (new rai::send_block (send1)));
system.nodes [0]->process_active (std::unique_ptr <rai::block> (new rai::send_block (send2)));
system.nodes [0]->process_active (std::unique_ptr <rai::block> (new rai::send_block (send1)));
auto iterations (0);
while (std::any_of (system.nodes.begin (), system.nodes.end (), [&] (std::shared_ptr <rai::node> const & node_a) {return node_a->balance (rai::test_genesis_key.pub) != rai::genesis_amount - system.nodes [0]->config.receive_minimum.number () * 2;}))
{
@ -181,7 +181,7 @@ TEST (node, quick_confirm)
rai::block_hash previous (system.nodes [0]->latest (rai::test_genesis_key.pub));
system.wallet (0)->insert_adhoc (key.prv);
auto send (std::make_shared <rai::send_block> (previous, key.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (previous)));
system.nodes [0]->process_receive_republish (send);
system.nodes [0]->process_active (send);
auto iterations (0);
while (system.nodes [0]->balance (key.pub).is_zero ())
{
@ -707,13 +707,13 @@ TEST (node, fork_publish)
auto send1 (std::make_shared <rai::send_block> (genesis.hash (), key1.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
rai::keypair key2;
auto send2 (std::make_shared <rai::send_block> (genesis.hash (), key2.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
node1.process_receive_republish (send1);
node1.process_active (send1);
ASSERT_EQ (1, node1.active.roots.size ());
auto existing (node1.active.roots.find (send1->root ()));
ASSERT_NE (node1.active.roots.end (), existing);
auto election (existing->election);
ASSERT_EQ (2, election->votes.rep_votes.size ());
node1.process_receive_republish (send2);
node1.process_active (send2);
auto existing1 (election->votes.rep_votes.find (rai::test_genesis_key.pub));
ASSERT_NE (election->votes.rep_votes.end (), existing1);
ASSERT_EQ (*send1, *existing1->second);
@ -738,12 +738,12 @@ TEST (node, fork_keep)
// send1 and send2 fork to different accounts
auto send1 (std::make_shared <rai::send_block> (genesis.hash (), key1.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ())));
auto send2 (std::make_shared <rai::send_block> (genesis.hash (), key2.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ())));
node1.process_receive_republish (send1);
node2.process_receive_republish (send1);
node1.process_active (send1);
node2.process_active (send1);
ASSERT_EQ (1, node1.active.roots.size ());
ASSERT_EQ (1, node2.active.roots.size ());
node1.process_receive_republish (send2);
node2.process_receive_republish (send2);
node1.process_active (send2);
node2.process_active (send2);
auto conflict (node2.active.roots.find (genesis.hash ()));
ASSERT_NE (node2.active.roots.end (), conflict);
auto votes1 (conflict->election);
@ -960,22 +960,22 @@ TEST (node, fork_open_flip)
rai::keypair rep1;
rai::keypair rep2;
auto send1 (std::make_shared <rai::send_block> (genesis.hash (), key1.pub, rai::genesis_amount - 1, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ())));
node1.process_receive_republish (send1);
node2.process_receive_republish (send1);
node1.process_active (send1);
node2.process_active (send1);
// We should be keeping this block
auto open1 (std::make_shared <rai::open_block> (send1->hash (), rep1.pub, key1.pub, key1.prv, key1.pub, system.work.generate (key1.pub)));
// This block should be evicted
auto open2 (std::make_shared <rai::open_block> (send1->hash (), rep2.pub, key1.pub, key1.prv, key1.pub, system.work.generate (key1.pub)));
ASSERT_FALSE (*open1 == *open2);
// node1 gets copy that will remain
node1.process_receive_republish (open1);
node1.process_active (open1);
// node2 gets copy that will be evicted
node2.process_receive_republish (open2);
node2.process_active (open2);
ASSERT_EQ (2, node1.active.roots.size ());
ASSERT_EQ (2, node2.active.roots.size ());
// Notify both nodes that a fork exists
node1.process_receive_republish (open2);
node2.process_receive_republish (open1);
node1.process_active (open2);
node2.process_active (open1);
auto conflict (node2.active.roots.find (open1->root ()));
ASSERT_NE (node2.active.roots.end (), conflict);
auto votes1 (conflict->election);
@ -1112,12 +1112,12 @@ TEST (node, broadcast_elected)
system.wallet (2)->insert_adhoc (rep_other.prv);
auto fork0 (std::make_shared <rai::send_block> (node2->latest (rai::test_genesis_key.pub), rep_small.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
node0->generate_work (*fork0);
node0->process_receive_republish (fork0);
node1->process_receive_republish (fork0);
node0->process_active (fork0);
node1->process_active (fork0);
auto fork1 (std::make_shared <rai::send_block> (node2->latest (rai::test_genesis_key.pub), rep_big.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
node0->generate_work (*fork1);
system.wallet (2)->insert_adhoc (rep_small.prv);
node2->process_receive_republish (fork1);
node2->process_active (fork1);
//std::cerr << "fork0: " << fork_hash.to_string () << std::endl;
//std::cerr << "fork1: " << fork1.hash ().to_string () << std::endl;
auto iterations (0);

View file

@ -52,7 +52,7 @@ TEST (work, cancel_many)
pool.cancel (key1);
}
TEST (work, opencl)
TEST (work, DISABLED_opencl)
{
rai::logging logging;
logging.init (rai::unique_path ());

View file

@ -1548,7 +1548,7 @@ void rai::bulk_push_server::received_block (boost::system::error_code const & ec
{
if (!connection->node->bootstrap_initiator.in_progress ())
{
connection->node->process_receive_republish (std::move (block));
connection->node->process_active (std::move (block));
}
receive ();
}

View file

@ -358,8 +358,7 @@ public:
++node.network.incoming.publish;
node.peers.contacted (sender, message_a.version_using);
node.peers.insert (sender, message_a.version_using);
node.block_arrival.add (message_a.block->hash ());
node.process_receive_republish (message_a.block);
node.process_active (message_a.block);
}
void confirm_req (rai::confirm_req const & message_a) override
{
@ -370,8 +369,7 @@ public:
++node.network.incoming.confirm_req;
node.peers.contacted (sender, message_a.version_using);
node.peers.insert (sender, message_a.version_using);
node.block_arrival.add (message_a.block->hash ());
node.process_receive_republish (message_a.block);
node.process_active (message_a.block);
rai::transaction transaction_a (node.store.environment, nullptr, false);
if (node.store.block_exists (transaction_a, message_a.block->hash ()))
{
@ -387,8 +385,7 @@ public:
++node.network.incoming.confirm_ack;
node.peers.contacted (sender, message_a.version_using);
node.peers.insert (sender, message_a.version_using);
node.block_arrival.add (message_a.vote.block->hash ());
node.process_receive_republish (message_a.vote.block);
node.process_active (message_a.vote.block);
node.vote_processor.vote (message_a.vote, sender);
}
void bulk_pull (rai::bulk_pull const &) override
@ -1317,6 +1314,14 @@ block_processor (*this)
observers.disconnect ();
};
observers.blocks.add ([this] (std::shared_ptr <rai::block> block_a, rai::account const & account_a, rai::amount const & amount_a)
{
if (this->block_arrival.recent (block_a->hash ()))
{
rai::transaction transaction (store.environment, nullptr, true);
active.start (transaction, block_a);
}
});
observers.blocks.add ([this] (std::shared_ptr <rai::block> block_a, rai::account const & account_a, rai::amount const & amount_a)
{
if (this->block_arrival.recent (block_a->hash ()))
{
@ -1588,29 +1593,14 @@ void rai::network::confirm_send (rai::confirm_ack const & confirm_a, std::shared
});
}
void rai::node::process_receive_republish (std::shared_ptr <rai::block> incoming)
void rai::node::process_active (std::shared_ptr <rai::block> incoming)
{
assert (incoming != nullptr);
auto node_l (shared_from_this ());
block_processor.add (incoming, [node_l] (MDB_txn * transaction_a, rai::process_return result_a, std::shared_ptr <rai::block> block_a)
{
switch (result_a.code)
{
case rai::process_result::progress:
{
node_l->active.start (transaction_a, block_a);
break;
}
default:
{
break;
}
}
});
if (rai::rai_network == rai::rai_networks::rai_test_network)
{
block_processor.flush ();
}
block_arrival.add (incoming->hash ());
block_processor.process_receive_many (incoming);
if (rai::rai_network == rai::rai_networks::rai_test_network)
{
block_processor.flush ();
}
}
rai::process_return rai::node::process (rai::block const & block_a)
@ -2258,15 +2248,16 @@ void rai::block_arrival::add (rai::block_hash const & hash_a)
std::lock_guard <std::mutex> lock (mutex);
auto now (std::chrono::system_clock::now ());
arrival.insert (rai::block_arrival_info {now, hash_a});
while (!arrival.empty () && arrival.begin ()->arrival + std::chrono::seconds (60) < now)
{
arrival.erase (arrival.begin ());
}
}
bool rai::block_arrival::recent (rai::block_hash const & hash_a)
{
std::lock_guard <std::mutex> lock (mutex);
std::lock_guard <std::mutex> lock (mutex);
auto now (std::chrono::system_clock::now ());
while (!arrival.empty () && arrival.begin ()->arrival + std::chrono::seconds (60) < now)
{
arrival.erase (arrival.begin ());
}
return arrival.get <1> ().find (hash_a) != arrival.get <1> ().end ();
}

View file

@ -439,8 +439,8 @@ public:
~block_processor ();
void stop ();
void flush ();
void add (std::shared_ptr <rai::block>, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)> = [] (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>) {});
void process_receive_many (std::shared_ptr <rai::block>, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)> = [] (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>) {});
void add (std::shared_ptr <rai::block>, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)> = [] (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>) {});
void process_receive_many (std::shared_ptr <rai::block>, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)> = [] (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>) {});
rai::process_return process_receive_one (MDB_txn *, std::shared_ptr <rai::block>);
private:
void process_blocks ();
@ -471,7 +471,7 @@ public:
int store_version ();
void process_confirmed (std::shared_ptr <rai::block>);
void process_message (rai::message &, rai::endpoint const &);
void process_receive_republish (std::shared_ptr <rai::block>);
void process_active (std::shared_ptr <rai::block>);
rai::process_return process (rai::block const &);
void keepalive_preconfigured (std::vector <std::string> const &);
rai::block_hash latest (rai::account const &);

View file

@ -1788,7 +1788,7 @@ void rai::rpc_handler::process ()
{
if (!node.work.work_validate (*block))
{
node.process_receive_republish (std::move (block));
node.process_active (std::move (block));
boost::property_tree::ptree response_l;
response (response_l);
}

View file

@ -1035,7 +1035,7 @@ std::shared_ptr <rai::block> rai::wallet::receive_action (rai::send_block const
if (block != nullptr)
{
assert (block != nullptr);
node.process_receive_republish (block);
node.process_active (block);
auto hash (block->hash ());
auto this_l (shared_from_this ());
auto source (send_a.hashables.destination);
@ -1073,7 +1073,7 @@ std::shared_ptr <rai::block> rai::wallet::change_action (rai::account const & so
if (block != nullptr)
{
assert (block != nullptr);
node.process_receive_republish (block);
node.process_active (block);
auto hash (block->hash ());
auto this_l (shared_from_this ());
node.wallets.queue_wallet_action (source_a, rai::wallets::generate_priority, [this_l, source_a, hash]
@ -1114,7 +1114,7 @@ std::shared_ptr <rai::block> rai::wallet::send_action (rai::account const & sour
if (block != nullptr)
{
assert (block != nullptr);
node.process_receive_republish (block);
node.process_active (block);
auto hash (block->hash ());
auto this_l (shared_from_this ());
node.wallets.queue_wallet_action (source_a, rai::wallets::generate_priority, [this_l, source_a, hash]

View file

@ -1673,7 +1673,7 @@ wallet (wallet_a)
{
show_label_ok (*status);
this->status->setText ("");
this->wallet.node.process_receive_republish (std::move (block_l));
this->wallet.node.process_active (std::move (block_l));
}
else
{