Adding testing for receivable processor.

This commit is contained in:
clemahieu 2014-06-09 23:29:41 -05:00
commit 4020cef692
3 changed files with 149 additions and 52 deletions

View file

@ -1043,10 +1043,11 @@ void mu_coin::network::receive_action (boost::system::error_code const & error,
receive ();
if (!error)
{
auto hash (incoming->block->hash ());
auto error (client.processor.process_publish (std::unique_ptr <mu_coin::publish_req> (incoming)));
if (!error)
{
mu_coin::publish_con outgoing;
mu_coin::publish_con outgoing {hash};
mu_coin::byte_write_stream stream;
outgoing.serialize (stream);
auto data (stream.data);
@ -1443,15 +1444,14 @@ public:
bool result;
};
class receivable_processor;
class receivable_message_processor : public mu_coin::message_visitor
{
public:
receivable_message_processor (receivable_processor & processor_a) :
receivable_message_processor (mu_coin::receivable_processor & processor_a) :
processor (processor_a)
{
}
receivable_processor & processor;
mu_coin::receivable_processor & processor;
void keepalive_req (mu_coin::keepalive_req const &)
{
assert (false);
@ -1471,58 +1471,51 @@ public:
void process_authorizations (mu_coin::block_hash const &, std::vector <mu_coin::authorization> const &);
};
class receivable_processor : public std::enable_shared_from_this <receivable_processor>
mu_coin::receivable_processor::receivable_processor (std::unique_ptr <mu_coin::publish_req> incoming_a, mu_coin::client & client_a) :
threshold (client_a.ledger.supply () / 2),
incoming (std::move (incoming_a)),
client (client_a),
complete (false)
{
public:
receivable_processor (std::unique_ptr <mu_coin::publish_req> incoming_a, mu_coin::client & client_a) :
threshold (client_a.ledger.supply () / 2),
incoming (std::move (incoming_a)),
client (client_a),
complete (false)
}
void mu_coin::receivable_processor::run ()
{
auto this_l (shared_from_this ());
advance_timeout ();
client.network.add_publish_listener (incoming->block->hash (), [this_l] (std::unique_ptr <mu_coin::message> message_a, mu_coin::endpoint const & endpoint_a) {this_l->publish_con (std::move (message_a), endpoint_a);});
auto peers (client.peers.list ());
for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i)
{
client.network.publish_block (*i, incoming->block->clone ());
}
void run ()
}
void mu_coin::receivable_processor::publish_con (std::unique_ptr <mu_coin::message> message, const mu_coin::endpoint &source)
{
receivable_message_processor processor_l (*this);
message->visit (processor_l);
}
void mu_coin::receivable_processor::advance_timeout ()
{
auto this_l (shared_from_this ());
timeout = std::chrono::system_clock::now () + std::chrono::seconds (5);
client.processor.service.add (timeout, [this_l] () {this_l->timeout_action ();});
}
void mu_coin::receivable_processor::timeout_action ()
{
std::lock_guard <std::mutex> lock (mutex);
if (timeout < std::chrono::system_clock::now ())
{
auto this_l (shared_from_this ());
advance_timeout ();
client.processor.service.add (timeout, [this_l] () {this_l->timeout_action ();});
client.network.add_publish_listener (incoming->block->hash (), [this_l] (std::unique_ptr <mu_coin::message> message_a, mu_coin::endpoint const & endpoint_a) {this_l->publish_ack (std::move (message_a), endpoint_a);});
auto peers (client.peers.list ());
for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i)
{
client.network.publish_block (*i, incoming->block->clone ());
}
}
void publish_ack (std::unique_ptr <mu_coin::message> message, mu_coin::endpoint const & source)
else
{
receivable_message_processor processor_l (*this);
message->visit (processor_l);
// Timeout signals may be invalid if we've received action since they were queued
}
void timeout_action ()
{
std::lock_guard <std::mutex> lock (mutex);
if (timeout < std::chrono::system_clock::now ())
{
}
else
{
// Timeout signals may be invalid if we've received action since they were queued
}
}
void advance_timeout ()
{
timeout = std::chrono::system_clock::now () + std::chrono::seconds (5);
}
mu_coin::uint256_t acknowledged;
mu_coin::uint256_t nacked;
mu_coin::uint256_t threshold;
std::chrono::system_clock::time_point timeout;
std::unique_ptr <mu_coin::publish_req> incoming;
mu_coin::client & client;
std::mutex mutex;
bool complete;
};
}
void receivable_message_processor::process_authorizations (mu_coin::block_hash const & block, std::vector <mu_coin::authorization> const & authorizations)
{
@ -1761,4 +1754,21 @@ void mu_coin::client::publish (std::unique_ptr <mu_coin::block> block)
mu_coin::uint256_t mu_coin::ledger::supply ()
{
return std::numeric_limits <mu_coin::uint256_t>::max ();
}
size_t mu_coin::processor_service::size ()
{
std::lock_guard <std::mutex> lock (mutex);
return operations.size ();
}
size_t mu_coin::network::publish_listener_size ()
{
std::lock_guard <std::mutex> lock (mutex);
return publish_listeners.size ();
}
mu_coin::publish_con::publish_con (mu_coin::block_hash const & block_a) :
block (block_a)
{
}

View file

@ -380,6 +380,7 @@ namespace mu_coin {
class publish_con : public message
{
public:
publish_con (mu_coin::block_hash const &);
bool deserialize (mu_coin::byte_read_stream &);
void serialize (mu_coin::byte_write_stream &);
void visit (mu_coin::message_visitor &) override;
@ -468,6 +469,7 @@ namespace mu_coin {
void add (std::chrono::system_clock::time_point const &, std::function <void ()> const &);
void stop ();
bool stopped ();
size_t size ();
private:
bool done;
std::mutex mutex;
@ -496,6 +498,7 @@ namespace mu_coin {
void publish_block (mu_coin::endpoint const &, std::unique_ptr <mu_coin::block>);
void add_publish_listener (mu_coin::block_hash const &, session const &);
void remove_publish_listener (mu_coin::block_hash const &);
size_t publish_listener_size ();
mu_coin::endpoint remote;
std::array <uint8_t, 4000> buffer;
boost::asio::ip::udp::socket socket;
@ -522,6 +525,23 @@ namespace mu_coin {
private:
std::mutex mutex;
std::unordered_set <boost::asio::ip::udp::endpoint> peers;
};
class receivable_processor : public std::enable_shared_from_this <receivable_processor>
{
public:
receivable_processor (std::unique_ptr <mu_coin::publish_req> incoming_a, mu_coin::client & client_a);
void run ();
void publish_con (std::unique_ptr <mu_coin::message> message, mu_coin::endpoint const & source);
void timeout_action ();
void advance_timeout ();
mu_coin::uint256_t acknowledged;
mu_coin::uint256_t nacked;
mu_coin::uint256_t threshold;
std::chrono::system_clock::time_point timeout;
std::unique_ptr <mu_coin::publish_req> incoming;
mu_coin::client & client;
std::mutex mutex;
bool complete;
};
class client
{

View file

@ -94,14 +94,17 @@ TEST (network, send_valid_publish)
{
boost::asio::io_service service;
mu_coin::processor_service processor;
mu_coin::secret_key secret;
mu_coin::keypair key1;
mu_coin::client client1 (service, 24001, processor);
client1.wallet.insert (key1.pub, key1.prv, secret);
client1.store.genesis_put (key1.pub, 100);
mu_coin::keypair key2;
mu_coin::client client2 (service, 24002, processor);
client2.wallet.insert (key2.pub, key2.prv, secret);
client2.store.genesis_put (key1.pub, 100);
client1.network.receive ();
client2.network.receive ();
mu_coin::keypair key2;
mu_coin::send_block block2;
mu_coin::block_hash hash1;
ASSERT_FALSE (client1.store.latest_get (key1.pub, hash1));
@ -113,15 +116,79 @@ TEST (network, send_valid_publish)
mu_coin::block_hash hash3;
ASSERT_FALSE (client2.store.latest_get (key1.pub, hash3));
client1.network.publish_block (client2.network.socket.local_endpoint (), std::unique_ptr <mu_coin::block> (new mu_coin::send_block (block2)));
while (client1.network.publish_con_count == 0)
while (client2.network.publish_con_count == 0)
{
service.run_one ();
}
ASSERT_EQ (1, client2.network.publish_req_count);
ASSERT_EQ (1, client1.network.publish_con_count);
ASSERT_EQ (1, client2.network.publish_con_count);
ASSERT_EQ (1, client1.network.publish_req_count);
ASSERT_EQ (1, client2.network.publish_req_count);
mu_coin::block_hash hash4;
ASSERT_FALSE (client2.store.latest_get (key1.pub, hash4));
ASSERT_FALSE (hash3 == hash4);
ASSERT_EQ (hash2, hash4);
ASSERT_EQ (49, client2.ledger.balance (key1.pub));
}
TEST (receivable_processor, timeout)
{
boost::asio::io_service io_service;
mu_coin::processor_service processor;
mu_coin::client client (io_service, 24001, processor);
auto receivable (std::make_shared <mu_coin::receivable_processor> (nullptr, client));
ASSERT_EQ (0, client.network.publish_listener_size ());
ASSERT_FALSE (receivable->complete);
ASSERT_EQ (0, processor.size ());
receivable->advance_timeout ();
ASSERT_EQ (1, processor.size ());
receivable->advance_timeout ();
ASSERT_EQ (2, processor.size ());
}
TEST (receivable_processor, confirm_no_pos)
{
boost::asio::io_service io_service;
mu_coin::processor_service processor;
mu_coin::client client1 (io_service, 24001, processor);
auto block1 (new mu_coin::send_block ());
auto receivable (std::make_shared <mu_coin::receivable_processor> (std::unique_ptr <mu_coin::publish_req> {new mu_coin::publish_req {std::unique_ptr <mu_coin::block> {block1}}}, client1));
receivable->run ();
ASSERT_EQ (1, client1.network.publish_listener_size ());
mu_coin::keypair key1;
mu_coin::publish_con con1 {block1->hash ()};
mu_coin::authorization auth1;
auth1.address = key1.pub;
mu_coin::sign_message (key1.prv, key1.pub, con1.block, auth1.signature);
con1.authorizations.push_back (auth1);
mu_coin::byte_write_stream stream;
con1.serialize (stream);
ASSERT_LE (stream.size, client1.network.buffer.size ());
std::copy (stream.data, stream.data + stream.size, client1.network.buffer.begin ());
client1.network.receive_action (boost::system::error_code {}, stream.size);
ASSERT_TRUE (receivable->acknowledged.is_zero ());
}
TEST (receivable_processor, confirm_insufficient_pos)
{
boost::asio::io_service io_service;
mu_coin::processor_service processor;
mu_coin::client client1 (io_service, 24001, processor);
mu_coin::keypair key1;
client1.ledger.store.genesis_put (key1.pub, 1);
auto block1 (new mu_coin::send_block ());
auto receivable (std::make_shared <mu_coin::receivable_processor> (std::unique_ptr <mu_coin::publish_req> {new mu_coin::publish_req {std::unique_ptr <mu_coin::block> {block1}}}, client1));
receivable->run ();
ASSERT_EQ (1, client1.network.publish_listener_size ());
mu_coin::publish_con con1 {block1->hash ()};
mu_coin::authorization auth1;
auth1.address = key1.pub;
mu_coin::sign_message (key1.prv, key1.pub, con1.block, auth1.signature);
con1.authorizations.push_back (auth1);
mu_coin::byte_write_stream stream;
con1.serialize (stream);
ASSERT_LE (stream.size, client1.network.buffer.size ());
std::copy (stream.data, stream.data + stream.size, client1.network.buffer.begin ());
client1.network.receive_action (boost::system::error_code {}, stream.size);
ASSERT_EQ (1, receivable->acknowledged);
}