Changing confirmation to be passive.

Adding time based confirmation if there are lack of conflicts observed.
This commit is contained in:
clemahieu 2014-12-06 19:08:35 -06:00
commit 920201c44e
10 changed files with 899 additions and 841 deletions

View file

@ -51,6 +51,7 @@ add_executable (core_test
rai/core_test/block_path.cpp
rai/core_test/block_store.cpp
rai/core_test/client.cpp
rai/core_test/conflicts.cpp
rai/core_test/daemon.cpp
rai/core_test/entry.cpp
rai/core_test/ledger.cpp

View file

@ -136,23 +136,30 @@ void rai::network::publish_block (boost::asio::ip::udp::endpoint const & endpoin
{
client.log.add (boost::str (boost::format ("Publish %1% to %2%") % block->hash ().to_string () % endpoint_a));
}
rai::publish message (std::move (block));
std::shared_ptr <std::vector <uint8_t>> bytes (new std::vector <uint8_t>);
if (client.is_representative ())
{
rai::vectorstream stream (*bytes);
message.serialize (stream);
confirm_block (std::move (block), 0);
}
auto client_l (client.shared ());
send_buffer (bytes->data (), bytes->size (), endpoint_a, [bytes, client_l] (boost::system::error_code const & ec, size_t size)
else
{
rai::publish message (std::move (block));
std::shared_ptr <std::vector <uint8_t>> bytes (new std::vector <uint8_t>);
{
if (network_logging ())
rai::vectorstream stream (*bytes);
message.serialize (stream);
}
auto client_l (client.shared ());
send_buffer (bytes->data (), bytes->size (), endpoint_a, [bytes, client_l] (boost::system::error_code const & ec, size_t size)
{
if (ec)
if (network_logging ())
{
client_l->log.add (boost::str (boost::format ("Error sending publish: %1%") % ec.message ()));
if (ec)
{
client_l->log.add (boost::str (boost::format ("Error sending publish: %1%") % ec.message ()));
}
}
}
});
});
}
}
void rai::network::send_confirm_req (boost::asio::ip::udp::endpoint const & endpoint_a, rai::block const & block)
@ -367,7 +374,7 @@ void rai::network::receive_action (boost::system::error_code const & error, size
{
if (network_logging ())
{
client.log.add ("Receive error, shutting down network");
client.log.add (boost::str (boost::format ("Receive error, shutting down network: %1%") % error.message ()));
}
}
}
@ -887,7 +894,16 @@ service (processor_a)
{
if (wallet.find (block_a.hashables.destination) != wallet.end ())
{
conflicts.start (block_a, true);
conflicts.start (block_a, false);
auto root (store.root (block_a));
std::shared_ptr <rai::block> block_l (block_a.clone ().release ());
service.add (std::chrono::system_clock::now () + rai::confirm_wait, [this, root, block_l] ()
{
if (conflicts.no_conflict (root))
{
processor.process_confirmed (*block_l);
}
});
}
});
if (!init_a.error ())
@ -988,43 +1004,6 @@ void rai::processor::republish (std::unique_ptr <rai::block> incoming_a, rai::en
republisher->run ();
}
namespace
{
class republish_visitor : public rai::block_visitor
{
public:
republish_visitor (std::shared_ptr <rai::client> client_a, std::unique_ptr <rai::block> incoming_a, rai::endpoint const & sender_a) :
client (client_a),
incoming (std::move (incoming_a)),
sender (sender_a)
{
assert (client_a->store.block_exists (incoming->hash ()));
}
void send_block (rai::send_block const & block_a)
{
if (client->wallet.find (block_a.hashables.destination) == client->wallet.end ())
{
client->processor.republish (std::move (incoming), sender);
}
}
void receive_block (rai::receive_block const & block_a)
{
client->processor.republish (std::move (incoming), sender);
}
void open_block (rai::open_block const & block_a)
{
client->processor.republish (std::move (incoming), sender);
}
void change_block (rai::change_block const & block_a)
{
client->processor.republish (std::move (incoming), sender);
}
std::shared_ptr <rai::client> client;
std::unique_ptr <rai::block> incoming;
rai::endpoint sender;
};
}
rai::gap_cache::gap_cache () :
max (128)
{
@ -1149,8 +1128,7 @@ void rai::processor::process_receive_republish (std::unique_ptr <rai::block> inc
{
case rai::process_result::progress:
{
republish_visitor visitor (client.shared (), std::move (block), sender_a);
visitor.incoming->visit (visitor);
republish (std::move (block), sender_a);
break;
}
default:
@ -3881,6 +3859,19 @@ void rai::conflicts::start (rai::block const & block_a, bool request_a)
}
}
bool rai::conflicts::no_conflict (rai::block_hash const & hash_a)
{
std::lock_guard <std::mutex> lock (mutex);
auto result (true);
auto existing (roots.find (hash_a));
if (existing != roots.end ())
{
auto size (existing->second->votes.rep_votes.size ());
result = size == 0 || size == 1;
}
return result;
}
void rai::conflicts::update (rai::vote const & vote_a)
{
std::lock_guard <std::mutex> lock (mutex);
@ -4014,7 +4005,6 @@ public:
{
auto error (client.transactions.receive (block_a, prv, client.representative));
prv.bytes.fill (0);
assert (!error);
}
else
{

File diff suppressed because it is too large Load diff

View file

@ -136,6 +136,53 @@ TEST (client, send_out_of_order)
}
}
TEST (client, bootstrap_end)
{
rai::system system (24000, 1);
rai::client_init init1;
auto client1 (std::make_shared <rai::client> (init1, system.service, 24001, system.processor, rai::test_genesis_key.pub));
ASSERT_FALSE (init1.error ());
client1->start ();
ASSERT_NE (nullptr, client1->processor.bootstrapped);
ASSERT_EQ (0, client1->processor.bootstrapped->size ());
for (auto i (0); i < rai::processor::bootstrap_max; ++i)
{
client1->processor.bootstrapped->insert (rai::endpoint (boost::asio::ip::address_v6::loopback (), 24002 + i));
}
client1->network.send_keepalive (system.clients [0]->network.endpoint ());
auto iterations (0);
do
{
system.service->poll_one ();
system.processor.poll_one ();
++iterations;
ASSERT_LT (iterations, 200);
} while (client1->processor.bootstrapped != nullptr);
client1->stop ();
}
TEST (client, quick_confirm)
{
rai::system system (24000, 1);
rai::keypair key;
system.clients [0]->wallet.insert (key.prv);
rai::send_block send;
send.hashables.balance = 0;
send.hashables.destination = key.pub;
send.hashables.previous = system.clients [0]->ledger.latest (rai::test_genesis_key.pub);
send.work = system.clients [0]->ledger.create_work (send);
rai::sign_message (rai::test_genesis_key.prv, rai::test_genesis_key.pub, send.hash (), send.signature);
ASSERT_EQ (rai::process_result::progress, system.clients [0]->processor.process_receive (send));
auto iterations (0);
while (system.clients [0]->ledger.account_balance (key.pub).is_zero ())
{
system.processor.poll_one ();
system.service->poll_one ();
++iterations;
ASSERT_LT (iterations, 200);
}
}
TEST (client, auto_bootstrap)
{
rai::system system (24000, 1);
@ -169,31 +216,6 @@ TEST (client, auto_bootstrap)
client1->stop ();
}
TEST (client, bootstrap_end)
{
rai::system system (24000, 1);
rai::client_init init1;
auto client1 (std::make_shared <rai::client> (init1, system.service, 24001, system.processor, rai::test_genesis_key.pub));
ASSERT_FALSE (init1.error ());
client1->start ();
ASSERT_NE (nullptr, client1->processor.bootstrapped);
ASSERT_EQ (0, client1->processor.bootstrapped->size ());
for (auto i (0); i < rai::processor::bootstrap_max; ++i)
{
client1->processor.bootstrapped->insert (rai::endpoint (boost::asio::ip::address_v6::loopback (), 24002 + i));
}
client1->network.send_keepalive (system.clients [0]->network.endpoint ());
auto iterations (0);
do
{
system.service->poll_one ();
system.processor.poll_one ();
++iterations;
ASSERT_LT (iterations, 200);
} while (client1->processor.bootstrapped != nullptr);
client1->stop ();
}
TEST (client, auto_bootstrap_reverse)
{
rai::system system (24000, 1);

View file

@ -0,0 +1,89 @@
#include <gtest/gtest.h>
#include <rai/core/core.hpp>
TEST (conflicts, start_stop)
{
rai::system system (24000, 1);
auto & client1 (*system.clients [0]);
rai::genesis genesis;
rai::send_block send1;
rai::keypair key1;
send1.hashables.previous = genesis.hash ();
send1.hashables.balance.clear ();
send1.hashables.destination = key1.pub;
rai::sign_message (rai::test_genesis_key.prv, rai::test_genesis_key.pub, send1.hash (), send1.signature);
ASSERT_EQ (rai::process_result::progress, client1.ledger.process (send1));
ASSERT_EQ (0, client1.conflicts.roots.size ());
ASSERT_TRUE (client1.conflicts.no_conflict (send1.hashables.previous));
client1.conflicts.start (send1, false);
ASSERT_TRUE (client1.conflicts.no_conflict (send1.hashables.previous));
ASSERT_EQ (1, client1.conflicts.roots.size ());
auto root1 (client1.store.root (send1));
auto existing1 (client1.conflicts.roots.find (root1));
ASSERT_NE (client1.conflicts.roots.end (), existing1);
auto votes1 (existing1->second);
ASSERT_NE (nullptr, votes1);
ASSERT_EQ (1, votes1->votes.rep_votes.size ());
client1.conflicts.stop (root1);
ASSERT_EQ (0, client1.conflicts.roots.size ());
}
TEST (conflicts, add_existing)
{
rai::system system (24000, 1);
auto & client1 (*system.clients [0]);
rai::genesis genesis;
rai::send_block send1;
rai::keypair key1;
send1.hashables.previous = genesis.hash ();
send1.hashables.balance.clear ();
send1.hashables.destination = key1.pub;
rai::sign_message (rai::test_genesis_key.prv, rai::test_genesis_key.pub, send1.hash (), send1.signature);
ASSERT_EQ (rai::process_result::progress, client1.ledger.process (send1));
client1.conflicts.start (send1, false);
rai::send_block send2;
rai::keypair key2;
send2.hashables.previous = genesis.hash ();
send2.hashables.balance.clear ();
send2.hashables.destination = key2.pub;
rai::sign_message (rai::test_genesis_key.prv, rai::test_genesis_key.pub, send2.hash (), send2.signature);
client1.conflicts.start (send2, false);
ASSERT_EQ (1, client1.conflicts.roots.size ());
rai::vote vote1;
vote1.account = key2.pub;
vote1.sequence = 0;
vote1.block = send2.clone ();
rai::sign_message (key2.prv, key2.pub, vote1.hash (), vote1.signature);
ASSERT_TRUE (client1.conflicts.no_conflict (send1.hashables.previous));
client1.conflicts.update (vote1);
ASSERT_FALSE (client1.conflicts.no_conflict (send1.hashables.previous));
ASSERT_EQ (1, client1.conflicts.roots.size ());
auto votes1 (client1.conflicts.roots [client1.store.root (send2)]);
ASSERT_NE (nullptr, votes1);
ASSERT_EQ (2, votes1->votes.rep_votes.size ());
ASSERT_NE (votes1->votes.rep_votes.end (), votes1->votes.rep_votes.find (key2.pub));
}
TEST (conflicts, add_two)
{
rai::system system (24000, 1);
auto & client1 (*system.clients [0]);
rai::genesis genesis;
rai::send_block send1;
rai::keypair key1;
send1.hashables.previous = genesis.hash ();
send1.hashables.balance.clear ();
send1.hashables.destination = key1.pub;
rai::sign_message (rai::test_genesis_key.prv, rai::test_genesis_key.pub, send1.hash (), send1.signature);
ASSERT_EQ (rai::process_result::progress, client1.ledger.process (send1));
client1.conflicts.start (send1, false);
rai::send_block send2;
rai::keypair key2;
send2.hashables.previous = send1.hash ();
send2.hashables.balance.clear ();
send2.hashables.destination = key2.pub;
rai::sign_message (rai::test_genesis_key.prv, rai::test_genesis_key.pub, send2.hash (), send2.signature);
ASSERT_EQ (rai::process_result::progress, client1.ledger.process (send2));
client1.conflicts.start (send2, false);
ASSERT_EQ (2, client1.conflicts.roots.size ());
}

View file

@ -962,89 +962,6 @@ TEST (votes, add_old)
ASSERT_EQ (send1, *winner.first);
}
TEST (conflicts, start_stop)
{
rai::system system (24000, 1);
auto & client1 (*system.clients [0]);
rai::genesis genesis;
rai::send_block send1;
rai::keypair key1;
send1.hashables.previous = genesis.hash ();
send1.hashables.balance.clear ();
send1.hashables.destination = key1.pub;
rai::sign_message (rai::test_genesis_key.prv, rai::test_genesis_key.pub, send1.hash (), send1.signature);
ASSERT_EQ (rai::process_result::progress, client1.ledger.process (send1));
ASSERT_EQ (0, client1.conflicts.roots.size ());
client1.conflicts.start (send1, false);
ASSERT_EQ (1, client1.conflicts.roots.size ());
auto root1 (client1.store.root (send1));
auto existing1 (client1.conflicts.roots.find (root1));
ASSERT_NE (client1.conflicts.roots.end (), existing1);
auto votes1 (existing1->second);
ASSERT_NE (nullptr, votes1);
ASSERT_EQ (1, votes1->votes.rep_votes.size ());
client1.conflicts.stop (root1);
ASSERT_EQ (0, client1.conflicts.roots.size ());
}
TEST (conflicts, add_existing)
{
rai::system system (24000, 1);
auto & client1 (*system.clients [0]);
rai::genesis genesis;
rai::send_block send1;
rai::keypair key1;
send1.hashables.previous = genesis.hash ();
send1.hashables.balance.clear ();
send1.hashables.destination = key1.pub;
rai::sign_message (rai::test_genesis_key.prv, rai::test_genesis_key.pub, send1.hash (), send1.signature);
ASSERT_EQ (rai::process_result::progress, client1.ledger.process (send1));
client1.conflicts.start (send1, false);
rai::send_block send2;
rai::keypair key2;
send2.hashables.previous = genesis.hash ();
send2.hashables.balance.clear ();
send2.hashables.destination = key2.pub;
rai::sign_message (rai::test_genesis_key.prv, rai::test_genesis_key.pub, send2.hash (), send2.signature);
client1.conflicts.start (send2, false);
ASSERT_EQ (1, client1.conflicts.roots.size ());
rai::vote vote1;
vote1.account = key2.pub;
vote1.sequence = 0;
vote1.block = send2.clone ();
rai::sign_message (key2.prv, key2.pub, vote1.hash (), vote1.signature);
client1.conflicts.update (vote1);
ASSERT_EQ (1, client1.conflicts.roots.size ());
auto votes1 (client1.conflicts.roots [client1.store.root (send2)]);
ASSERT_NE (nullptr, votes1);
ASSERT_EQ (2, votes1->votes.rep_votes.size ());
ASSERT_NE (votes1->votes.rep_votes.end (), votes1->votes.rep_votes.find (key2.pub));
}
TEST (conflicts, add_two)
{
rai::system system (24000, 1);
auto & client1 (*system.clients [0]);
rai::genesis genesis;
rai::send_block send1;
rai::keypair key1;
send1.hashables.previous = genesis.hash ();
send1.hashables.balance.clear ();
send1.hashables.destination = key1.pub;
rai::sign_message (rai::test_genesis_key.prv, rai::test_genesis_key.pub, send1.hash (), send1.signature);
ASSERT_EQ (rai::process_result::progress, client1.ledger.process (send1));
client1.conflicts.start (send1, false);
rai::send_block send2;
rai::keypair key2;
send2.hashables.previous = send1.hash ();
send2.hashables.balance.clear ();
send2.hashables.destination = key2.pub;
rai::sign_message (rai::test_genesis_key.prv, rai::test_genesis_key.pub, send2.hash (), send2.signature);
ASSERT_EQ (rai::process_result::progress, client1.ledger.process (send2));
client1.conflicts.start (send2, false);
ASSERT_EQ (2, client1.conflicts.roots.size ());
}
TEST (ledger, successor)
{
rai::system system (24000, 1);

View file

@ -234,7 +234,7 @@ TEST (network, send_invalid_publish)
ASSERT_EQ (genesis.hash (), system.clients [1]->ledger.latest (rai::test_genesis_key.pub));
}
TEST (network, send_valid_publish)
TEST (network, send_valid_confirm_ack)
{
rai::system system (24000, 2);
system.clients [0]->wallet.insert (rai::test_genesis_key.prv);
@ -253,7 +253,39 @@ TEST (network, send_valid_publish)
ASSERT_FALSE (system.clients [1]->store.latest_get (rai::test_genesis_key.pub, frontier2));
system.clients [0]->processor.process_receive_republish (std::unique_ptr <rai::block> (new rai::send_block (block2)), system.clients [0]->network.endpoint ());
auto iterations (0);
while (system.clients [1]->network.publish_req_count == 0)
while (system.clients [1]->network.confirm_ack_count == 0)
{
system.service->poll_one ();
++iterations;
ASSERT_LT (iterations, 200);
}
rai::frontier frontier3;
ASSERT_FALSE (system.clients [1]->store.latest_get (rai::test_genesis_key.pub, frontier3));
ASSERT_FALSE (frontier2.hash == frontier3.hash);
ASSERT_EQ (hash2, frontier3.hash);
ASSERT_EQ (50, system.clients [1]->ledger.account_balance (rai::test_genesis_key.pub));
}
TEST (network, send_valid_publish)
{
rai::system system (24000, 2);
system.clients [0]->wallet.insert (rai::test_genesis_key.prv);
rai::keypair key2;
system.clients [1]->wallet.insert (key2.prv);
rai::send_block block2;
rai::frontier frontier1;
ASSERT_FALSE (system.clients [0]->store.latest_get (rai::test_genesis_key.pub, frontier1));
block2.hashables.previous = frontier1.hash;
block2.hashables.balance = 50;
block2.hashables.destination = key2.pub;
block2.work = system.clients [0]->ledger.create_work (block2);
auto hash2 (block2.hash ());
rai::sign_message (rai::test_genesis_key.prv, rai::test_genesis_key.pub, hash2, block2.signature);
rai::frontier frontier2;
ASSERT_FALSE (system.clients [1]->store.latest_get (rai::test_genesis_key.pub, frontier2));
system.clients [1]->processor.process_receive_republish (std::unique_ptr <rai::block> (new rai::send_block (block2)), system.clients [0]->network.endpoint ());
auto iterations (0);
while (system.clients [0]->network.publish_req_count == 0)
{
system.service->poll_one ();
++iterations;
@ -349,17 +381,17 @@ TEST (receivable_processor, send_with_receive)
ASSERT_EQ (0, system.clients [0]->ledger.account_balance (key2.pub));
ASSERT_EQ (amount, system.clients [1]->ledger.account_balance (rai::test_genesis_key.pub));
ASSERT_EQ (0, system.clients [1]->ledger.account_balance (key2.pub));
ASSERT_EQ (rai::process_result::progress, system.clients [0]->ledger.process (*block1));
ASSERT_EQ (rai::process_result::progress, system.clients [1]->ledger.process (*block1));
system.clients [0]->processor.process_receive_republish (block1->clone (), rai::endpoint ());
system.clients [1]->processor.process_receive_republish (block1->clone (), rai::endpoint ());
ASSERT_EQ (amount - 100, system.clients [0]->ledger.account_balance (rai::test_genesis_key.pub));
ASSERT_EQ (0, system.clients [0]->ledger.account_balance (key2.pub));
ASSERT_EQ (amount - 100, system.clients [1]->ledger.account_balance (rai::test_genesis_key.pub));
ASSERT_EQ (0, system.clients [1]->ledger.account_balance (key2.pub));
system.clients [1]->conflicts.start (*block1, true);
auto iterations (0);
while (system.clients [0]->network.publish_req_count != 1)
while (system.clients [0]->ledger.account_balance (key2.pub) != 100)
{
system.service->poll_one ();
system.processor.poll_one ();
++iterations;
ASSERT_LT (iterations, 200);
}

View file

@ -19,6 +19,7 @@ rai::account const rai::rai_beta_account (rai_beta_public_key);
rai::account const rai::rai_live_account (rai_live_public_key);
rai::account const rai::genesis_account = rai_network == rai_networks::rai_test_network ? rai_test_account : rai_network == rai_networks::rai_beta_network ? rai_beta_account : rai_live_account;
std::chrono::milliseconds const rai::confirm_wait = rai_network == rai_networks::rai_test_network ? std::chrono::milliseconds (0) : std::chrono::milliseconds (5000);
CryptoPP::AutoSeededRandomPool rai::random_pool;
@ -126,7 +127,8 @@ rai::votes::votes (rai::ledger & ledger_a, rai::block const & block_a) :
ledger (ledger_a),
root (ledger.store.root (block_a)),
last_winner (block_a.clone ()),
sequence (0)
// Sequence 0 is the first response by a representative before a fork was observed
sequence (1)
{
}

View file

@ -543,6 +543,7 @@ namespace rai
rai::block_hash hash () const;
rai::open_block open;
};
extern std::chrono::milliseconds const confirm_wait;
enum class rai_networks
{
rai_test_network,

View file

@ -1,6 +1,8 @@
#include <gtest/gtest.h>
#include <rai/core/core.hpp>
#include <thread>
TEST (system, generate_mass_activity)
{
rai::system system (24000, 1);