diff --git a/mu_coin/mu_coin.cpp b/mu_coin/mu_coin.cpp index 6eda695a..c581151e 100644 --- a/mu_coin/mu_coin.cpp +++ b/mu_coin/mu_coin.cpp @@ -1043,10 +1043,11 @@ void mu_coin::network::receive_action (boost::system::error_code const & error, receive (); if (!error) { + auto hash (incoming->block->hash ()); auto error (client.processor.process_publish (std::unique_ptr (incoming))); if (!error) { - mu_coin::publish_con outgoing; + mu_coin::publish_con outgoing {hash}; mu_coin::byte_write_stream stream; outgoing.serialize (stream); auto data (stream.data); @@ -1443,15 +1444,14 @@ public: bool result; }; -class receivable_processor; class receivable_message_processor : public mu_coin::message_visitor { public: - receivable_message_processor (receivable_processor & processor_a) : + receivable_message_processor (mu_coin::receivable_processor & processor_a) : processor (processor_a) { } - receivable_processor & processor; + mu_coin::receivable_processor & processor; void keepalive_req (mu_coin::keepalive_req const &) { assert (false); @@ -1471,58 +1471,51 @@ public: void process_authorizations (mu_coin::block_hash const &, std::vector const &); }; -class receivable_processor : public std::enable_shared_from_this +mu_coin::receivable_processor::receivable_processor (std::unique_ptr incoming_a, mu_coin::client & client_a) : +threshold (client_a.ledger.supply () / 2), +incoming (std::move (incoming_a)), +client (client_a), +complete (false) { -public: - receivable_processor (std::unique_ptr incoming_a, mu_coin::client & client_a) : - threshold (client_a.ledger.supply () / 2), - incoming (std::move (incoming_a)), - client (client_a), - complete (false) +} + +void mu_coin::receivable_processor::run () +{ + auto this_l (shared_from_this ()); + advance_timeout (); + client.network.add_publish_listener (incoming->block->hash (), [this_l] (std::unique_ptr message_a, mu_coin::endpoint const & endpoint_a) {this_l->publish_con (std::move (message_a), endpoint_a);}); + auto peers (client.peers.list ()); + for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i) { + client.network.publish_block (*i, incoming->block->clone ()); } - void run () +} + +void mu_coin::receivable_processor::publish_con (std::unique_ptr message, const mu_coin::endpoint &source) +{ + receivable_message_processor processor_l (*this); + message->visit (processor_l); +} + +void mu_coin::receivable_processor::advance_timeout () +{ + auto this_l (shared_from_this ()); + timeout = std::chrono::system_clock::now () + std::chrono::seconds (5); + client.processor.service.add (timeout, [this_l] () {this_l->timeout_action ();}); +} + +void mu_coin::receivable_processor::timeout_action () +{ + std::lock_guard lock (mutex); + if (timeout < std::chrono::system_clock::now ()) { - auto this_l (shared_from_this ()); - advance_timeout (); - client.processor.service.add (timeout, [this_l] () {this_l->timeout_action ();}); - client.network.add_publish_listener (incoming->block->hash (), [this_l] (std::unique_ptr message_a, mu_coin::endpoint const & endpoint_a) {this_l->publish_ack (std::move (message_a), endpoint_a);}); - auto peers (client.peers.list ()); - for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i) - { - client.network.publish_block (*i, incoming->block->clone ()); - } + } - void publish_ack (std::unique_ptr message, mu_coin::endpoint const & source) + else { - receivable_message_processor processor_l (*this); - message->visit (processor_l); + // Timeout signals may be invalid if we've received action since they were queued } - void timeout_action () - { - std::lock_guard lock (mutex); - if (timeout < std::chrono::system_clock::now ()) - { - - } - else - { - // Timeout signals may be invalid if we've received action since they were queued - } - } - void advance_timeout () - { - timeout = std::chrono::system_clock::now () + std::chrono::seconds (5); - } - mu_coin::uint256_t acknowledged; - mu_coin::uint256_t nacked; - mu_coin::uint256_t threshold; - std::chrono::system_clock::time_point timeout; - std::unique_ptr incoming; - mu_coin::client & client; - std::mutex mutex; - bool complete; -}; +} void receivable_message_processor::process_authorizations (mu_coin::block_hash const & block, std::vector const & authorizations) { @@ -1761,4 +1754,21 @@ void mu_coin::client::publish (std::unique_ptr block) mu_coin::uint256_t mu_coin::ledger::supply () { return std::numeric_limits ::max (); +} + +size_t mu_coin::processor_service::size () +{ + std::lock_guard lock (mutex); + return operations.size (); +} + +size_t mu_coin::network::publish_listener_size () +{ + std::lock_guard lock (mutex); + return publish_listeners.size (); +} + +mu_coin::publish_con::publish_con (mu_coin::block_hash const & block_a) : +block (block_a) +{ } \ No newline at end of file diff --git a/mu_coin/mu_coin.hpp b/mu_coin/mu_coin.hpp index 42477d58..73081629 100644 --- a/mu_coin/mu_coin.hpp +++ b/mu_coin/mu_coin.hpp @@ -380,6 +380,7 @@ namespace mu_coin { class publish_con : public message { public: + publish_con (mu_coin::block_hash const &); bool deserialize (mu_coin::byte_read_stream &); void serialize (mu_coin::byte_write_stream &); void visit (mu_coin::message_visitor &) override; @@ -468,6 +469,7 @@ namespace mu_coin { void add (std::chrono::system_clock::time_point const &, std::function const &); void stop (); bool stopped (); + size_t size (); private: bool done; std::mutex mutex; @@ -496,6 +498,7 @@ namespace mu_coin { void publish_block (mu_coin::endpoint const &, std::unique_ptr ); void add_publish_listener (mu_coin::block_hash const &, session const &); void remove_publish_listener (mu_coin::block_hash const &); + size_t publish_listener_size (); mu_coin::endpoint remote; std::array buffer; boost::asio::ip::udp::socket socket; @@ -522,6 +525,23 @@ namespace mu_coin { private: std::mutex mutex; std::unordered_set peers; + }; + class receivable_processor : public std::enable_shared_from_this + { + public: + receivable_processor (std::unique_ptr incoming_a, mu_coin::client & client_a); + void run (); + void publish_con (std::unique_ptr message, mu_coin::endpoint const & source); + void timeout_action (); + void advance_timeout (); + mu_coin::uint256_t acknowledged; + mu_coin::uint256_t nacked; + mu_coin::uint256_t threshold; + std::chrono::system_clock::time_point timeout; + std::unique_ptr incoming; + mu_coin::client & client; + std::mutex mutex; + bool complete; }; class client { diff --git a/mu_coin_test/test_network.cpp b/mu_coin_test/test_network.cpp index 1382fa14..decc49b2 100644 --- a/mu_coin_test/test_network.cpp +++ b/mu_coin_test/test_network.cpp @@ -94,14 +94,17 @@ TEST (network, send_valid_publish) { boost::asio::io_service service; mu_coin::processor_service processor; + mu_coin::secret_key secret; mu_coin::keypair key1; mu_coin::client client1 (service, 24001, processor); + client1.wallet.insert (key1.pub, key1.prv, secret); client1.store.genesis_put (key1.pub, 100); + mu_coin::keypair key2; mu_coin::client client2 (service, 24002, processor); + client2.wallet.insert (key2.pub, key2.prv, secret); client2.store.genesis_put (key1.pub, 100); client1.network.receive (); client2.network.receive (); - mu_coin::keypair key2; mu_coin::send_block block2; mu_coin::block_hash hash1; ASSERT_FALSE (client1.store.latest_get (key1.pub, hash1)); @@ -113,15 +116,79 @@ TEST (network, send_valid_publish) mu_coin::block_hash hash3; ASSERT_FALSE (client2.store.latest_get (key1.pub, hash3)); client1.network.publish_block (client2.network.socket.local_endpoint (), std::unique_ptr (new mu_coin::send_block (block2))); - while (client1.network.publish_con_count == 0) + while (client2.network.publish_con_count == 0) { service.run_one (); } - ASSERT_EQ (1, client2.network.publish_req_count); ASSERT_EQ (1, client1.network.publish_con_count); + ASSERT_EQ (1, client2.network.publish_con_count); + ASSERT_EQ (1, client1.network.publish_req_count); + ASSERT_EQ (1, client2.network.publish_req_count); mu_coin::block_hash hash4; ASSERT_FALSE (client2.store.latest_get (key1.pub, hash4)); ASSERT_FALSE (hash3 == hash4); ASSERT_EQ (hash2, hash4); ASSERT_EQ (49, client2.ledger.balance (key1.pub)); +} + +TEST (receivable_processor, timeout) +{ + boost::asio::io_service io_service; + mu_coin::processor_service processor; + mu_coin::client client (io_service, 24001, processor); + auto receivable (std::make_shared (nullptr, client)); + ASSERT_EQ (0, client.network.publish_listener_size ()); + ASSERT_FALSE (receivable->complete); + ASSERT_EQ (0, processor.size ()); + receivable->advance_timeout (); + ASSERT_EQ (1, processor.size ()); + receivable->advance_timeout (); + ASSERT_EQ (2, processor.size ()); +} + +TEST (receivable_processor, confirm_no_pos) +{ + boost::asio::io_service io_service; + mu_coin::processor_service processor; + mu_coin::client client1 (io_service, 24001, processor); + auto block1 (new mu_coin::send_block ()); + auto receivable (std::make_shared (std::unique_ptr {new mu_coin::publish_req {std::unique_ptr {block1}}}, client1)); + receivable->run (); + ASSERT_EQ (1, client1.network.publish_listener_size ()); + mu_coin::keypair key1; + mu_coin::publish_con con1 {block1->hash ()}; + mu_coin::authorization auth1; + auth1.address = key1.pub; + mu_coin::sign_message (key1.prv, key1.pub, con1.block, auth1.signature); + con1.authorizations.push_back (auth1); + mu_coin::byte_write_stream stream; + con1.serialize (stream); + ASSERT_LE (stream.size, client1.network.buffer.size ()); + std::copy (stream.data, stream.data + stream.size, client1.network.buffer.begin ()); + client1.network.receive_action (boost::system::error_code {}, stream.size); + ASSERT_TRUE (receivable->acknowledged.is_zero ()); +} + +TEST (receivable_processor, confirm_insufficient_pos) +{ + boost::asio::io_service io_service; + mu_coin::processor_service processor; + mu_coin::client client1 (io_service, 24001, processor); + mu_coin::keypair key1; + client1.ledger.store.genesis_put (key1.pub, 1); + auto block1 (new mu_coin::send_block ()); + auto receivable (std::make_shared (std::unique_ptr {new mu_coin::publish_req {std::unique_ptr {block1}}}, client1)); + receivable->run (); + ASSERT_EQ (1, client1.network.publish_listener_size ()); + mu_coin::publish_con con1 {block1->hash ()}; + mu_coin::authorization auth1; + auth1.address = key1.pub; + mu_coin::sign_message (key1.prv, key1.pub, con1.block, auth1.signature); + con1.authorizations.push_back (auth1); + mu_coin::byte_write_stream stream; + con1.serialize (stream); + ASSERT_LE (stream.size, client1.network.buffer.size ()); + std::copy (stream.data, stream.data + stream.size, client1.network.buffer.begin ()); + client1.network.receive_action (boost::system::error_code {}, stream.size); + ASSERT_EQ (1, receivable->acknowledged); } \ No newline at end of file