From aeebb43d9571af54a5c91c8bb7240f086396de08 Mon Sep 17 00:00:00 2001 From: clemahieu Date: Mon, 27 Oct 2014 20:05:50 -0500 Subject: [PATCH] Unifying keepalive. --- rai/core/core.cpp | 146 +++++++++---------------------------------- rai/core/core.hpp | 21 ++----- rai/test/block.cpp | 4 +- rai/test/network.cpp | 40 ++++-------- 4 files changed, 49 insertions(+), 162 deletions(-) diff --git a/rai/core/core.cpp b/rai/core/core.cpp index 8fc443a0..311ba411 100644 --- a/rai/core/core.cpp +++ b/rai/core/core.cpp @@ -93,8 +93,7 @@ socket (service_a, boost::asio::ip::udp::endpoint (boost::asio::ip::address_v4:: service (service_a), resolver (service_a), client (client_a), -keepalive_req_count (0), -keepalive_ack_count (0), +keepalive_count (0), publish_req_count (0), confirm_req_count (0), confirm_ack_count (0), @@ -128,8 +127,9 @@ void rai::network::maintain_keepalive (boost::asio::ip::udp::endpoint const & en { if (!client.peers.contacting_peer (endpoint_a) && endpoint_a != endpoint ()) { - rai::keepalive_req message; + rai::keepalive message; client.peers.random_fill (message.peers); + message.checksum = client.ledger.checksum (0, std::numeric_limits ::max ()); std::shared_ptr > bytes (new std::vector ); { rai::vectorstream stream (*bytes); @@ -228,32 +228,15 @@ void rai::network::receive_action (boost::system::error_code const & error, size read (type_stream, type); switch (type) { - case rai::message_type::keepalive_req: + case rai::message_type::keepalive: { - rai::keepalive_req incoming; + rai::keepalive incoming; rai::bufferstream stream (buffer.data (), size_a); auto error (incoming.deserialize (stream)); receive (); if (!error) { - ++keepalive_req_count; - client.processor.process_message (incoming, sender); - } - else - { - ++error_count; - } - break; - } - case rai::message_type::keepalive_ack: - { - rai::keepalive_ack incoming; - rai::bufferstream stream (buffer.data (), size_a); - auto error (incoming.deserialize (stream)); - receive (); - if (!error) - { - ++keepalive_ack_count; + ++keepalive_count; client.processor.process_message (incoming, sender); } else @@ -1291,14 +1274,9 @@ std::vector rai::peer_container::list () return result; } -void rai::keepalive_req::visit (rai::message_visitor & visitor_a) const +void rai::keepalive::visit (rai::message_visitor & visitor_a) const { - visitor_a.keepalive_req (*this); -} - -void rai::keepalive_ack::visit (rai::message_visitor & visitor_a) const -{ - visitor_a.keepalive_ack (*this); + visitor_a.keepalive (*this); } void rai::publish::visit (rai::message_visitor & visitor_a) const @@ -1306,9 +1284,9 @@ void rai::publish::visit (rai::message_visitor & visitor_a) const visitor_a.publish (*this); } -void rai::keepalive_ack::serialize (rai::stream & stream_a) +void rai::keepalive::serialize (rai::stream & stream_a) { - write (stream_a, rai::message_type::keepalive_ack); + write (stream_a, rai::message_type::keepalive); for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i) { uint32_t address (i->address ().to_v4 ().to_ulong ()); @@ -1318,11 +1296,11 @@ void rai::keepalive_ack::serialize (rai::stream & stream_a) write (stream_a, checksum); } -bool rai::keepalive_ack::deserialize (rai::stream & stream_a) +bool rai::keepalive::deserialize (rai::stream & stream_a) { rai::message_type type; auto result (read (stream_a, type)); - assert (type == rai::message_type::keepalive_ack); + assert (type == rai::message_type::keepalive); for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i) { uint32_t address; @@ -1335,33 +1313,6 @@ bool rai::keepalive_ack::deserialize (rai::stream & stream_a) return result; } -void rai::keepalive_req::serialize (rai::stream & stream_a) -{ - write (stream_a, rai::message_type::keepalive_req); - for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i) - { - uint32_t address (i->address ().to_v4 ().to_ulong ()); - write (stream_a, address); - write (stream_a, i->port ()); - } -} - -bool rai::keepalive_req::deserialize (rai::stream & stream_a) -{ - rai::message_type type; - auto result (read (stream_a, type)); - assert (type == rai::message_type::keepalive_req); - for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i) - { - uint32_t address; - uint16_t port; - read (stream_a, address); - read (stream_a, port); - *i = rai::endpoint (boost::asio::ip::address_v4 (address), port); - } - return result; -} - size_t rai::processor_service::size () { std::lock_guard lock (mutex); @@ -2237,36 +2188,32 @@ public: connection (connection_a) { } - void keepalive_req (rai::keepalive_req const &) + void keepalive (rai::keepalive const &) override { assert (false); } - void keepalive_ack (rai::keepalive_ack const &) + void publish (rai::publish const &) override { assert (false); } - void publish (rai::publish const &) + void confirm_req (rai::confirm_req const &) override { assert (false); } - void confirm_req (rai::confirm_req const &) + void confirm_ack (rai::confirm_ack const &) override { assert (false); } - void confirm_ack (rai::confirm_ack const &) + void confirm_unk (rai::confirm_unk const &) override { assert (false); } - void confirm_unk (rai::confirm_unk const &) - { - assert (false); - } - void bulk_req (rai::bulk_req const &) + void bulk_req (rai::bulk_req const &) override { auto response (std::make_shared (connection, std::unique_ptr (static_cast (connection->requests.front ().release ())))); response->send_next (); } - void frontier_req (rai::frontier_req const &) + void frontier_req (rai::frontier_req const &) override { auto response (std::make_shared (connection, std::unique_ptr (static_cast (connection->requests.front ().release ())))); response->send_next (); @@ -2413,36 +2360,32 @@ public: connection (connection_a) { } - void keepalive_req (rai::keepalive_req const &) + void keepalive (rai::keepalive const &) override { assert (false); } - void keepalive_ack (rai::keepalive_ack const &) + void publish (rai::publish const &) override { assert (false); } - void publish (rai::publish const &) + void confirm_req (rai::confirm_req const &) override { assert (false); } - void confirm_req (rai::confirm_req const &) + void confirm_ack (rai::confirm_ack const &) override { assert (false); } - void confirm_ack (rai::confirm_ack const &) + void confirm_unk (rai::confirm_unk const &) override { assert (false); } - void confirm_unk (rai::confirm_unk const &) - { - assert (false); - } - void bulk_req (rai::bulk_req const &) + void bulk_req (rai::bulk_req const &) override { auto response (std::make_shared (connection, std::unique_ptr (static_cast (connection->requests.front ().release ())))); response->receive_block (); } - void frontier_req (rai::frontier_req const &) + void frontier_req (rai::frontier_req const &) override { auto response (std::make_shared (connection, std::unique_ptr (static_cast (connection->requests.front ().release ())))); response->receive_frontier (); @@ -3285,7 +3228,7 @@ void rai::block_store::checksum_del (uint64_t prefix, uint8_t mask) checksum->Delete (leveldb::WriteOptions (), leveldb::Slice (reinterpret_cast (&key), sizeof (uint64_t))); } -bool rai::keepalive_ack::operator == (rai::keepalive_ack const & other_a) const +bool rai::keepalive::operator == (rai::keepalive const & other_a) const { return (peers == other_a.peers) && (checksum == other_a.checksum); } @@ -3598,42 +3541,11 @@ public: sender (sender_a) { } - void keepalive_req (rai::keepalive_req const & message_a) override + void keepalive (rai::keepalive const & message_a) override { if (network_keepalive_logging ()) { - client.log.add (boost::str (boost::format ("Received keepalive req from %1%") % sender)); - } - rai::keepalive_ack ack_message; - client.peers.random_fill (ack_message.peers); - ack_message.checksum = client.ledger.checksum (0, std::numeric_limits ::max ()); - std::shared_ptr > ack_bytes (new std::vector ); - { - rai::vectorstream stream (*ack_bytes); - ack_message.serialize (stream); - } - auto client_l (client.shared ()); - client.network.send_buffer (ack_bytes->data (), ack_bytes->size (), sender, [ack_bytes, client_l] (boost::system::error_code const & error, size_t size_a) - { - if (network_logging ()) - { - if (error) - { - client_l->log.add (boost::str (boost::format ("Error sending keepalive ack: %1%") % error.message ())); - } - } - }); - client.network.merge_peers (message_a.peers); - if (network_keepalive_logging ()) - { - client.log.add (boost::str (boost::format ("Sending keepalive ack to %1%") % sender)); - } - } - void keepalive_ack (rai::keepalive_ack const & message_a) override - { - if (network_keepalive_logging ()) - { - client.log.add (boost::str (boost::format ("Received keepalive ack from %1%") % sender)); + client.log.add (boost::str (boost::format ("Received keepalive from %1%") % sender)); } client.network.merge_peers (message_a.peers); if (message_a.checksum != client.ledger.checksum (0, std::numeric_limits ::max ())) diff --git a/rai/core/core.hpp b/rai/core/core.hpp index e6970336..c0fcf196 100644 --- a/rai/core/core.hpp +++ b/rai/core/core.hpp @@ -120,8 +120,7 @@ namespace rai { { invalid, not_a_type, - keepalive_req, - keepalive_ack, + keepalive, publish, confirm_req, confirm_ack, @@ -137,21 +136,13 @@ namespace rai { virtual void serialize (rai::stream &) = 0; virtual void visit (rai::message_visitor &) const = 0; }; - class keepalive_req : public message + class keepalive : public message { public: void visit (rai::message_visitor &) const override; bool deserialize (rai::stream &); void serialize (rai::stream &) override; - std::array peers; - }; - class keepalive_ack : public message - { - public: - void visit (rai::message_visitor &) const override; - bool deserialize (rai::stream &); - void serialize (rai::stream &) override; - bool operator == (rai::keepalive_ack const &) const; + bool operator == (rai::keepalive const &) const; std::array peers; rai::uint256_union checksum; }; @@ -220,8 +211,7 @@ namespace rai { class message_visitor { public: - virtual void keepalive_req (rai::keepalive_req const &) = 0; - virtual void keepalive_ack (rai::keepalive_ack const &) = 0; + virtual void keepalive (rai::keepalive const &) = 0; virtual void publish (rai::publish const &) = 0; virtual void confirm_req (rai::confirm_req const &) = 0; virtual void confirm_ack (rai::confirm_ack const &) = 0; @@ -457,8 +447,7 @@ namespace rai { rai::client & client; std::queue >> sends; std::mutex mutex; - uint64_t keepalive_req_count; - uint64_t keepalive_ack_count; + uint64_t keepalive_count; uint64_t publish_req_count; uint64_t confirm_req_count; uint64_t confirm_ack_count; diff --git a/rai/test/block.cpp b/rai/test/block.cpp index 672ed2a0..41b7b141 100644 --- a/rai/test/block.cpp +++ b/rai/test/block.cpp @@ -440,13 +440,13 @@ TEST (frontier_req, serialization) TEST (keepalive_ack, serialization) { - rai::keepalive_ack request1; + rai::keepalive request1; std::vector bytes; { rai::vectorstream stream (bytes); request1.serialize (stream); } - rai::keepalive_ack request2; + rai::keepalive request2; rai::bufferstream buffer (bytes.data (), bytes.size ()); ASSERT_FALSE (request2.deserialize (buffer)); ASSERT_EQ (request1, request2); diff --git a/rai/test/network.cpp b/rai/test/network.cpp index f0ca47a6..67ab2e94 100644 --- a/rai/test/network.cpp +++ b/rai/test/network.cpp @@ -63,33 +63,18 @@ TEST (network, self_discard) ASSERT_EQ (1, system.clients [0]->network.bad_sender_count); } -TEST (keepalive_req, deserialize) -{ - rai::keepalive_req message1; - rai::endpoint endpoint (boost::asio::ip::address_v4 (0x7f000001), 10000); - message1.peers [0] = endpoint; - std::vector bytes; - { - rai::vectorstream stream (bytes); - message1.serialize (stream); - } - rai::keepalive_req message2; - rai::bufferstream stream (bytes.data (), bytes.size ()); - ASSERT_FALSE (message2.deserialize (stream)); - ASSERT_EQ (message1.peers, message2.peers); -} - TEST (keepalive_ack, deserialize) { - rai::keepalive_ack message1; + rai::keepalive message1; rai::endpoint endpoint (boost::asio::ip::address_v4 (0x7f000001), 10000); message1.peers [0] = endpoint; + message1.checksum = 1; std::vector bytes; { rai::vectorstream stream (bytes); message1.serialize (stream); } - rai::keepalive_ack message2; + rai::keepalive message2; rai::bufferstream stream (bytes.data (), bytes.size ()); ASSERT_FALSE (message2.deserialize (stream)); ASSERT_EQ (message1.peers, message2.peers); @@ -97,22 +82,23 @@ TEST (keepalive_ack, deserialize) TEST (network, send_keepalive) { - rai::system system (24000, 2); + rai::system system (24000, 1); auto list1 (system.clients [0]->peers.list ()); - ASSERT_EQ (1, list1.size ()); - while (list1 [0].last_contact == std::chrono::system_clock::now ()); - system.clients [0]->network.maintain_keepalive (system.clients [1]->network.endpoint ()); - auto initial (system.clients [0]->network.keepalive_ack_count); - while (system.clients [0]->network.keepalive_ack_count == initial) + ASSERT_EQ (0, list1.size ()); + rai::client_init init1; + auto client1 (std::make_shared (init1, system.service, 24001, system.processor, rai::test_genesis_key.pub)); + client1->start (); + system.clients [0]->network.maintain_keepalive (client1->network.endpoint ()); + auto initial (system.clients [0]->network.keepalive_count); + while (system.clients [0]->network.keepalive_count == initial) { system.service->run_one (); } auto peers1 (system.clients [0]->peers.list ()); - auto peers2 (system.clients [1]->peers.list ()); + auto peers2 (client1->peers.list ()); ASSERT_EQ (1, peers1.size ()); ASSERT_EQ (1, peers2.size ()); - ASSERT_NE (peers1.end (), std::find_if (peers1.begin (), peers1.end (), [&system] (rai::peer_information const & information_a) {return information_a.endpoint == system.clients [1]->network.endpoint ();})); - ASSERT_GT (peers1 [0].last_contact, list1 [0].last_contact); + ASSERT_NE (peers1.end (), std::find_if (peers1.begin (), peers1.end (), [&client1] (rai::peer_information const & information_a) {return information_a.endpoint == client1->network.endpoint ();})); ASSERT_NE (peers2.end (), std::find_if (peers2.begin (), peers2.end (), [&system] (rai::peer_information const & information_a) {return information_a.endpoint == system.clients [0]->network.endpoint ();})); }