Adding a bunch to receivable processing.

This commit is contained in:
clemahieu 2014-06-08 17:53:39 -05:00
commit e74f20fde6
2 changed files with 171 additions and 11 deletions

View file

@ -1443,36 +1443,64 @@ public:
bool result;
};
class receivable_processor;
class receivable_message_processor : public mu_coin::message_visitor
{
public:
receivable_message_processor (receivable_processor & processor_a) :
processor (processor_a)
{
}
receivable_processor & processor;
void keepalive_req (mu_coin::keepalive_req const &)
{
assert (false);
}
void keepalive_ack (mu_coin::keepalive_ack const &)
{
assert (false);
}
void publish_req (mu_coin::publish_req const &)
{
assert (false);
}
void publish_con (mu_coin::publish_con const &);
void publish_dup (mu_coin::publish_dup const &);
void publish_unk (mu_coin::publish_unk const &);
void publish_nak (mu_coin::publish_nak const &);
void process_authorizations (mu_coin::block_hash const &, std::vector <mu_coin::authorization> const &);
};
class receivable_processor : public std::enable_shared_from_this <receivable_processor>
{
public:
receivable_processor (std::unique_ptr <mu_coin::publish_req> incoming_a, mu_coin::client & client_a) :
threshold (client_a.ledger.supply () / 2),
incoming (std::move (incoming_a)),
client (client_a)
client (client_a),
complete (false)
{
}
void run ()
{
auto this_l (shared_from_this ());
timeout = std::chrono::system_clock::now () + std::chrono::seconds (5);
advance_timeout ();
client.processor.service.add (timeout, [this_l] () {this_l->timeout_action ();});
client.network.add_publish_ack_listener (incoming->block->hash (), [this_l] (std::unique_ptr <mu_coin::message> message_a, mu_coin::endpoint const & endpoint_a) {this_l->publish_ack (std::move (message_a), endpoint_a);});
client.network.add_publish_listener (incoming->block->hash (), [this_l] (std::unique_ptr <mu_coin::message> message_a, mu_coin::endpoint const & endpoint_a) {this_l->publish_ack (std::move (message_a), endpoint_a);});
auto peers (client.peers.list ());
for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i)
{
client.network.publish_block (*i, incoming->block->clone ());
}
}
void publish_ack (std::unique_ptr <mu_coin::message> message, mu_coin::endpoint const &)
void publish_ack (std::unique_ptr <mu_coin::message> message, mu_coin::endpoint const & source)
{
receivable_message_processor processor_l (*this);
message->visit (processor_l);
}
void timeout_action ()
{
std::lock_guard <std::mutex> lock (mutex);
if (timeout < std::chrono::system_clock::now ())
{
@ -1482,11 +1510,119 @@ public:
// Timeout signals may be invalid if we've received action since they were queued
}
}
void advance_timeout ()
{
timeout = std::chrono::system_clock::now () + std::chrono::seconds (5);
}
mu_coin::uint256_t acknowledged;
mu_coin::uint256_t nacked;
mu_coin::uint256_t threshold;
std::chrono::system_clock::time_point timeout;
std::unique_ptr <mu_coin::publish_req> incoming;
mu_coin::client & client;
std::mutex mutex;
bool complete;
};
void receivable_message_processor::process_authorizations (mu_coin::block_hash const & block, std::vector <mu_coin::authorization> const & authorizations)
{
mu_coin::uint256_t acknowledged;
for (auto i (authorizations.begin ()), j (authorizations.end ()); i != j; ++i)
{
if (!mu_coin::validate_message (i->address, block, i->signature))
{
auto balance (processor.client.ledger.balance (i->address));
acknowledged += balance;
}
else
{
// Signature didn't match.
}
}
std::unique_lock <std::mutex> lock (processor.mutex);
if (!processor.complete)
{
processor.acknowledged += acknowledged;
if (processor.acknowledged > processor.threshold && processor.nacked.is_zero ())
{
processor.complete = true;
lock.release ();
assert (dynamic_cast <mu_coin::send_block *> (processor.incoming->block.get ()) != nullptr);
auto & send (static_cast <mu_coin::send_block &> (*processor.incoming->block.get ()));
auto hash (send.hash ());
for (auto i (send.outputs.begin ()), j (send.outputs.end ()); i != j; ++i)
{
mu_coin::private_key prv;
mu_coin::secret_key key;
if (!processor.client.wallet.fetch (i->destination, key, prv))
{
if (!processor.client.ledger.store.pending_get (i->destination, hash))
{
mu_coin::block_hash previous;
processor.client.ledger.store.latest_get (i->destination, previous);
auto receive (new mu_coin::receive_block);
receive->previous = previous;
receive->source = hash;
mu_coin::sign_message (prv, i->destination, receive->hash (), receive->signature);
prv.bytes.fill (0);
if (!processor.client.ledger.process (*receive))
{
processor.client.publish (std::unique_ptr <mu_coin::block> (receive));
}
else
{
assert (false); // Internal error, our own ledger doesn't accept the block, let's forget this ever happened...
}
}
else
{
// Ledger doesn't have this marked as available to receive anymore
}
}
else
{
// Wallet doesn't contain key for this destination or couldn't decrypt
}
}
}
else
{
processor.advance_timeout ();
}
}
}
void receivable_message_processor::publish_con (mu_coin::publish_con const & message)
{
process_authorizations (message.block, message.authorizations);
}
void receivable_message_processor::publish_dup (mu_coin::publish_dup const & message)
{
process_authorizations (message.block, message.authorizations);
}
void receivable_message_processor::publish_unk (mu_coin::publish_unk const & message)
{
}
void receivable_message_processor::publish_nak (mu_coin::publish_nak const & message)
{
auto block (message.block->hash ());
for (auto i (message.authorizations.begin ()), j (message.authorizations.end ()); i != j; ++i)
{
if (!mu_coin::validate_message (i->address, block, i->signature))
{
auto balance (processor.client.ledger.balance (i->address));
processor.nacked += balance;
}
else
{
// Signature didn't match.
}
}
}
void mu_coin::processor::process_receivable (std::unique_ptr <mu_coin::publish_req> incoming)
{
auto processor (std::make_shared <receivable_processor> (std::move (incoming), client));
@ -1518,16 +1654,16 @@ std::vector <boost::asio::ip::udp::endpoint> mu_coin::peer_container::list ()
return result;
}
void mu_coin::network::add_publish_ack_listener (mu_coin::block_hash const & block_a, session const & function_a)
void mu_coin::network::add_publish_listener (mu_coin::block_hash const & block_a, session const & function_a)
{
std::lock_guard <std::mutex> lock (mutex);
publish_ack_listeners [block_a] = function_a;
publish_listeners [block_a] = function_a;
}
void mu_coin::network::remove_publish_ack_listener (mu_coin::block_hash const & block_a)
void mu_coin::network::remove_publish_listener (mu_coin::block_hash const & block_a)
{
std::lock_guard <std::mutex> lock (mutex);
publish_ack_listeners.erase (block_a);
publish_listeners.erase (block_a);
}
void mu_coin::keepalive_req::visit (mu_coin::message_visitor & visitor_a)
@ -1615,4 +1751,14 @@ bool mu_coin::keepalive_req::deserialize (mu_coin::byte_read_stream & stream)
auto result (stream.read (type));
assert (type == mu_coin::message_type::keepalive_req);
return result;
}
void mu_coin::client::publish (std::unique_ptr <mu_coin::block> block)
{
auto list (peers.list ());
}
mu_coin::uint256_t mu_coin::ledger::supply ()
{
return std::numeric_limits <mu_coin::uint256_t>::max ();
}

View file

@ -100,6 +100,7 @@ namespace mu_coin {
void clear ();
boost::multiprecision::uint512_t number ();
};
using signature = uint512_union;
using endpoint = boost::asio::ip::udp::endpoint;
}
@ -300,6 +301,7 @@ namespace mu_coin {
public:
ledger (mu_coin::block_store &);
mu_coin::uint256_t balance (mu_coin::address const &);
mu_coin::uint256_t supply ();
mu_coin::process_result process (mu_coin::block const &);
mu_coin::block_store & store;
};
@ -338,6 +340,12 @@ namespace mu_coin {
publish_unk,
publish_nak
};
class authorization
{
public:
mu_coin::address address;
mu_coin::signature signature;
};
class message_visitor;
class message
{
@ -375,6 +383,8 @@ namespace mu_coin {
bool deserialize (mu_coin::byte_read_stream &);
void serialize (mu_coin::byte_write_stream &);
void visit (mu_coin::message_visitor &) override;
mu_coin::block_hash block;
std::vector <mu_coin::authorization> authorizations;
};
class publish_dup : public message
{
@ -382,6 +392,8 @@ namespace mu_coin {
bool deserialize (mu_coin::byte_read_stream &);
void serialize (mu_coin::byte_write_stream &);
void visit (mu_coin::message_visitor &) override;
mu_coin::block_hash block;
std::vector <mu_coin::authorization> authorizations;
};
class publish_unk : public message
{
@ -397,6 +409,7 @@ namespace mu_coin {
void serialize (mu_coin::byte_write_stream &);
void visit (mu_coin::message_visitor &) override;
std::unique_ptr <mu_coin::block> block; // Observed fork block
std::vector <mu_coin::authorization> authorizations;
};
class message_visitor
{
@ -481,8 +494,8 @@ namespace mu_coin {
void receive_action (boost::system::error_code const &, size_t);
void send_keepalive (mu_coin::endpoint const &);
void publish_block (mu_coin::endpoint const &, std::unique_ptr <mu_coin::block>);
void add_publish_ack_listener (mu_coin::block_hash const &, session const &);
void remove_publish_ack_listener (mu_coin::block_hash const &);
void add_publish_listener (mu_coin::block_hash const &, session const &);
void remove_publish_listener (mu_coin::block_hash const &);
mu_coin::endpoint remote;
std::array <uint8_t, 4000> buffer;
boost::asio::ip::udp::socket socket;
@ -499,7 +512,7 @@ namespace mu_coin {
bool on;
private:
std::mutex mutex;
std::unordered_map <mu_coin::block_hash, session> publish_ack_listeners;
std::unordered_map <mu_coin::block_hash, session> publish_listeners;
};
class peer_container
{
@ -515,6 +528,7 @@ namespace mu_coin {
public:
client (boost::asio::io_service &, uint16_t, boost::filesystem::path const &, boost::filesystem::path const &, mu_coin::processor_service &);
client (boost::asio::io_service &, uint16_t, mu_coin::processor_service &);
void publish (std::unique_ptr <mu_coin::block>);
mu_coin::block_store store;
mu_coin::ledger ledger;
mu_coin::wallet wallet;