diff --git a/mu_coin/mu_coin.cpp b/mu_coin/mu_coin.cpp index e54d908e..6eda695a 100644 --- a/mu_coin/mu_coin.cpp +++ b/mu_coin/mu_coin.cpp @@ -1443,36 +1443,64 @@ public: bool result; }; +class receivable_processor; class receivable_message_processor : public mu_coin::message_visitor { public: + receivable_message_processor (receivable_processor & processor_a) : + processor (processor_a) + { + } + receivable_processor & processor; + void keepalive_req (mu_coin::keepalive_req const &) + { + assert (false); + } + void keepalive_ack (mu_coin::keepalive_ack const &) + { + assert (false); + } + void publish_req (mu_coin::publish_req const &) + { + assert (false); + } + void publish_con (mu_coin::publish_con const &); + void publish_dup (mu_coin::publish_dup const &); + void publish_unk (mu_coin::publish_unk const &); + void publish_nak (mu_coin::publish_nak const &); + void process_authorizations (mu_coin::block_hash const &, std::vector const &); }; class receivable_processor : public std::enable_shared_from_this { public: receivable_processor (std::unique_ptr incoming_a, mu_coin::client & client_a) : + threshold (client_a.ledger.supply () / 2), incoming (std::move (incoming_a)), - client (client_a) + client (client_a), + complete (false) { } void run () { auto this_l (shared_from_this ()); - timeout = std::chrono::system_clock::now () + std::chrono::seconds (5); + advance_timeout (); client.processor.service.add (timeout, [this_l] () {this_l->timeout_action ();}); - client.network.add_publish_ack_listener (incoming->block->hash (), [this_l] (std::unique_ptr message_a, mu_coin::endpoint const & endpoint_a) {this_l->publish_ack (std::move (message_a), endpoint_a);}); + client.network.add_publish_listener (incoming->block->hash (), [this_l] (std::unique_ptr message_a, mu_coin::endpoint const & endpoint_a) {this_l->publish_ack (std::move (message_a), endpoint_a);}); auto peers (client.peers.list ()); for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i) { client.network.publish_block (*i, incoming->block->clone ()); } } - void publish_ack (std::unique_ptr message, mu_coin::endpoint const &) + void publish_ack (std::unique_ptr message, mu_coin::endpoint const & source) { + receivable_message_processor processor_l (*this); + message->visit (processor_l); } void timeout_action () { + std::lock_guard lock (mutex); if (timeout < std::chrono::system_clock::now ()) { @@ -1482,11 +1510,119 @@ public: // Timeout signals may be invalid if we've received action since they were queued } } + void advance_timeout () + { + timeout = std::chrono::system_clock::now () + std::chrono::seconds (5); + } + mu_coin::uint256_t acknowledged; + mu_coin::uint256_t nacked; + mu_coin::uint256_t threshold; std::chrono::system_clock::time_point timeout; std::unique_ptr incoming; mu_coin::client & client; + std::mutex mutex; + bool complete; }; +void receivable_message_processor::process_authorizations (mu_coin::block_hash const & block, std::vector const & authorizations) +{ + mu_coin::uint256_t acknowledged; + for (auto i (authorizations.begin ()), j (authorizations.end ()); i != j; ++i) + { + if (!mu_coin::validate_message (i->address, block, i->signature)) + { + auto balance (processor.client.ledger.balance (i->address)); + acknowledged += balance; + } + else + { + // Signature didn't match. + } + } + std::unique_lock lock (processor.mutex); + if (!processor.complete) + { + processor.acknowledged += acknowledged; + if (processor.acknowledged > processor.threshold && processor.nacked.is_zero ()) + { + processor.complete = true; + lock.release (); + assert (dynamic_cast (processor.incoming->block.get ()) != nullptr); + auto & send (static_cast (*processor.incoming->block.get ())); + auto hash (send.hash ()); + for (auto i (send.outputs.begin ()), j (send.outputs.end ()); i != j; ++i) + { + mu_coin::private_key prv; + mu_coin::secret_key key; + if (!processor.client.wallet.fetch (i->destination, key, prv)) + { + if (!processor.client.ledger.store.pending_get (i->destination, hash)) + { + mu_coin::block_hash previous; + processor.client.ledger.store.latest_get (i->destination, previous); + auto receive (new mu_coin::receive_block); + receive->previous = previous; + receive->source = hash; + mu_coin::sign_message (prv, i->destination, receive->hash (), receive->signature); + prv.bytes.fill (0); + if (!processor.client.ledger.process (*receive)) + { + processor.client.publish (std::unique_ptr (receive)); + } + else + { + assert (false); // Internal error, our own ledger doesn't accept the block, let's forget this ever happened... + } + } + else + { + // Ledger doesn't have this marked as available to receive anymore + } + } + else + { + // Wallet doesn't contain key for this destination or couldn't decrypt + } + } + } + else + { + processor.advance_timeout (); + } + } +} + +void receivable_message_processor::publish_con (mu_coin::publish_con const & message) +{ + process_authorizations (message.block, message.authorizations); +} + +void receivable_message_processor::publish_dup (mu_coin::publish_dup const & message) +{ + process_authorizations (message.block, message.authorizations); +} + +void receivable_message_processor::publish_unk (mu_coin::publish_unk const & message) +{ +} + +void receivable_message_processor::publish_nak (mu_coin::publish_nak const & message) +{ + auto block (message.block->hash ()); + for (auto i (message.authorizations.begin ()), j (message.authorizations.end ()); i != j; ++i) + { + if (!mu_coin::validate_message (i->address, block, i->signature)) + { + auto balance (processor.client.ledger.balance (i->address)); + processor.nacked += balance; + } + else + { + // Signature didn't match. + } + } +} + void mu_coin::processor::process_receivable (std::unique_ptr incoming) { auto processor (std::make_shared (std::move (incoming), client)); @@ -1518,16 +1654,16 @@ std::vector mu_coin::peer_container::list () return result; } -void mu_coin::network::add_publish_ack_listener (mu_coin::block_hash const & block_a, session const & function_a) +void mu_coin::network::add_publish_listener (mu_coin::block_hash const & block_a, session const & function_a) { std::lock_guard lock (mutex); - publish_ack_listeners [block_a] = function_a; + publish_listeners [block_a] = function_a; } -void mu_coin::network::remove_publish_ack_listener (mu_coin::block_hash const & block_a) +void mu_coin::network::remove_publish_listener (mu_coin::block_hash const & block_a) { std::lock_guard lock (mutex); - publish_ack_listeners.erase (block_a); + publish_listeners.erase (block_a); } void mu_coin::keepalive_req::visit (mu_coin::message_visitor & visitor_a) @@ -1615,4 +1751,14 @@ bool mu_coin::keepalive_req::deserialize (mu_coin::byte_read_stream & stream) auto result (stream.read (type)); assert (type == mu_coin::message_type::keepalive_req); return result; +} + +void mu_coin::client::publish (std::unique_ptr block) +{ + auto list (peers.list ()); +} + +mu_coin::uint256_t mu_coin::ledger::supply () +{ + return std::numeric_limits ::max (); } \ No newline at end of file diff --git a/mu_coin/mu_coin.hpp b/mu_coin/mu_coin.hpp index 605b7ae3..42477d58 100644 --- a/mu_coin/mu_coin.hpp +++ b/mu_coin/mu_coin.hpp @@ -100,6 +100,7 @@ namespace mu_coin { void clear (); boost::multiprecision::uint512_t number (); }; + using signature = uint512_union; using endpoint = boost::asio::ip::udp::endpoint; } @@ -300,6 +301,7 @@ namespace mu_coin { public: ledger (mu_coin::block_store &); mu_coin::uint256_t balance (mu_coin::address const &); + mu_coin::uint256_t supply (); mu_coin::process_result process (mu_coin::block const &); mu_coin::block_store & store; }; @@ -338,6 +340,12 @@ namespace mu_coin { publish_unk, publish_nak }; + class authorization + { + public: + mu_coin::address address; + mu_coin::signature signature; + }; class message_visitor; class message { @@ -375,6 +383,8 @@ namespace mu_coin { bool deserialize (mu_coin::byte_read_stream &); void serialize (mu_coin::byte_write_stream &); void visit (mu_coin::message_visitor &) override; + mu_coin::block_hash block; + std::vector authorizations; }; class publish_dup : public message { @@ -382,6 +392,8 @@ namespace mu_coin { bool deserialize (mu_coin::byte_read_stream &); void serialize (mu_coin::byte_write_stream &); void visit (mu_coin::message_visitor &) override; + mu_coin::block_hash block; + std::vector authorizations; }; class publish_unk : public message { @@ -397,6 +409,7 @@ namespace mu_coin { void serialize (mu_coin::byte_write_stream &); void visit (mu_coin::message_visitor &) override; std::unique_ptr block; // Observed fork block + std::vector authorizations; }; class message_visitor { @@ -481,8 +494,8 @@ namespace mu_coin { void receive_action (boost::system::error_code const &, size_t); void send_keepalive (mu_coin::endpoint const &); void publish_block (mu_coin::endpoint const &, std::unique_ptr ); - void add_publish_ack_listener (mu_coin::block_hash const &, session const &); - void remove_publish_ack_listener (mu_coin::block_hash const &); + void add_publish_listener (mu_coin::block_hash const &, session const &); + void remove_publish_listener (mu_coin::block_hash const &); mu_coin::endpoint remote; std::array buffer; boost::asio::ip::udp::socket socket; @@ -499,7 +512,7 @@ namespace mu_coin { bool on; private: std::mutex mutex; - std::unordered_map publish_ack_listeners; + std::unordered_map publish_listeners; }; class peer_container { @@ -515,6 +528,7 @@ namespace mu_coin { public: client (boost::asio::io_service &, uint16_t, boost::filesystem::path const &, boost::filesystem::path const &, mu_coin::processor_service &); client (boost::asio::io_service &, uint16_t, mu_coin::processor_service &); + void publish (std::unique_ptr ); mu_coin::block_store store; mu_coin::ledger ledger; mu_coin::wallet wallet;