From 2cd53b073ec80148e47b36802eb4123e73bbaa32 Mon Sep 17 00:00:00 2001 From: clemahieu Date: Thu, 30 Oct 2014 22:53:45 -0500 Subject: [PATCH] Adding stuff for new header format. --- rai/core/core.cpp | 188 ++++++++++++++++++++++++++++--------------- rai/core/core.hpp | 4 +- rai/test/message.cpp | 5 +- rai/test/network.cpp | 8 +- 4 files changed, 135 insertions(+), 70 deletions(-) diff --git a/rai/core/core.cpp b/rai/core/core.cpp index 251bcc39..6d5e854d 100644 --- a/rai/core/core.cpp +++ b/rai/core/core.cpp @@ -49,7 +49,7 @@ namespace } bool constexpr log_to_cerr () { - return true; + return false; } } @@ -232,7 +232,8 @@ void rai::network::receive_action (boost::system::error_code const & error, size uint8_t version_using; uint8_t version_min; rai::message_type type; - if (!rai::message::read_header (header_stream, version_max, version_using, version_min, type)) + std::bitset <64> extensions; + if (!rai::message::read_header (header_stream, version_max, version_using, version_min, type, extensions)) { auto sender (remote); maintain_keepalive (sender); @@ -409,9 +410,14 @@ block (std::move (block_a)) bool rai::publish::deserialize (rai::stream & stream_a) { - rai::message_type type; - auto result (read (stream_a, type)); + uint8_t version_max; + uint8_t version_using; + uint8_t version_min; + rai::message_type type; + std::bitset <64> extensions; + auto result (read_header (stream_a, version_max, version_using, version_min, type, extensions)); assert (!result); + assert (type == rai::message_type::publish); if (!result) { result = read (stream_a, work); @@ -426,7 +432,7 @@ bool rai::publish::deserialize (rai::stream & stream_a) void rai::publish::serialize (rai::stream & stream_a) { - write (stream_a, rai::message_type::publish); + write_header (stream_a); write (stream_a, work); rai::serialize_block (stream_a, *block); } @@ -1320,7 +1326,7 @@ void rai::keepalive::visit (rai::message_visitor & visitor_a) const void rai::keepalive::serialize (rai::stream & stream_a) { - write (stream_a, rai::message_type::keepalive); + write_header (stream_a); for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i) { assert (i->address ().is_v6 ()); @@ -1333,8 +1339,13 @@ void rai::keepalive::serialize (rai::stream & stream_a) bool rai::keepalive::deserialize (rai::stream & stream_a) { - rai::message_type type; - auto result (read (stream_a, type)); + uint8_t version_max; + uint8_t version_using; + uint8_t version_min; + rai::message_type type; + std::bitset <64> extensions; + auto result (read_header (stream_a, version_max, version_using, version_min, type, extensions)); + assert (!result); assert (type == rai::message_type::keepalive); for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i) { @@ -1457,8 +1468,13 @@ message (rai::message_type::confirm_ack) bool rai::confirm_ack::deserialize (rai::stream & stream_a) { - rai::message_type type; - auto result (read (stream_a, type)); + uint8_t version_max; + uint8_t version_using; + uint8_t version_min; + rai::message_type type; + std::bitset <64> extensions; + auto result (read_header (stream_a, version_max, version_using, version_min, type, extensions)); + assert (!result); assert (type == rai::message_type::confirm_ack); if (!result) { @@ -1482,7 +1498,7 @@ bool rai::confirm_ack::deserialize (rai::stream & stream_a) void rai::confirm_ack::serialize (rai::stream & stream_a) { - write (stream_a, rai::message_type::confirm_ack); + write_header (stream_a); write (stream_a, vote.address); write (stream_a, vote.signature); write (stream_a, vote.sequence); @@ -1507,14 +1523,22 @@ message (rai::message_type::confirm_req) bool rai::confirm_req::deserialize (rai::stream & stream_a) { - rai::message_type type; - read (stream_a, type); + uint8_t version_max; + uint8_t version_using; + uint8_t version_min; + rai::message_type type; + std::bitset <64> extensions; + auto result (read_header (stream_a, version_max, version_using, version_min, type, extensions)); + assert (!result); assert (type == rai::message_type::confirm_req); - auto result (read (stream_a, work)); if (!result) - { - block = rai::deserialize_block (stream_a); - result = block == nullptr; + { + result = read (stream_a, work); + if (!result) + { + block = rai::deserialize_block (stream_a); + result = block == nullptr; + } } return result; } @@ -1526,10 +1550,18 @@ message (rai::message_type::confirm_unk) bool rai::confirm_unk::deserialize (rai::stream & stream_a) { - rai::message_type type; - read (stream_a, type); + uint8_t version_max; + uint8_t version_using; + uint8_t version_min; + rai::message_type type; + std::bitset <64> extensions; + auto result (read_header (stream_a, version_max, version_using, version_min, type, extensions)); + assert (!result); assert (type == rai::message_type::confirm_unk); - auto result (read (stream_a, rep_hint)); + if (!result) + { + result = read (stream_a, rep_hint); + } return result; } @@ -1546,7 +1578,7 @@ void rai::confirm_unk::visit (rai::message_visitor & visitor_a) const void rai::confirm_req::serialize (rai::stream & stream_a) { assert (block != nullptr); - write (stream_a, rai::message_type::confirm_req); + write_header (stream_a); write (stream_a, work); rai::serialize_block (stream_a, *block); } @@ -1799,6 +1831,7 @@ void rai::block_store::representation_put (rai::address const & address_a, rai:: void rai::confirm_unk::serialize (rai::stream & stream_a) { + write_header (stream_a); write (stream_a, rep_hint); } @@ -1998,8 +2031,14 @@ void rai::bulk_req::visit (rai::message_visitor & visitor_a) const bool rai::bulk_req::deserialize (rai::stream & stream_a) { - rai::message_type type; - auto result (read (stream_a, type)); + uint8_t version_max; + uint8_t version_using; + uint8_t version_min; + rai::message_type type; + std::bitset <64> extensions; + auto result (read_header (stream_a, version_max, version_using, version_min, type, extensions)); + assert (!result); + assert (rai::message_type::bulk_req == type); if (!result) { assert (type == rai::message_type::bulk_req); @@ -2014,7 +2053,7 @@ bool rai::bulk_req::deserialize (rai::stream & stream_a) void rai::bulk_req::serialize (rai::stream & stream_a) { - write (stream_a, rai::message_type::bulk_req); + write_header (stream_a); write (stream_a, start); write (stream_a, end); } @@ -2115,49 +2154,55 @@ client (client_a) void rai::bootstrap_connection::receive () { auto this_l (shared_from_this ()); - boost::asio::async_read (*socket, boost::asio::buffer (receive_buffer.data (), 1), [this_l] (boost::system::error_code const & ec, size_t size_a) + boost::asio::async_read (*socket, boost::asio::buffer (receive_buffer.data (), 16), [this_l] (boost::system::error_code const & ec, size_t size_a) { - this_l->receive_type_action (ec, size_a); + this_l->receive_header_action (ec, size_a); }); } -void rai::bootstrap_connection::receive_type_action (boost::system::error_code const & ec, size_t size_a) +void rai::bootstrap_connection::receive_header_action (boost::system::error_code const & ec, size_t size_a) { if (!ec) { - assert (size_a == 1); - rai::bufferstream type_stream (receive_buffer.data (), size_a); - rai::message_type type; - read (type_stream, type); - switch (type) - { - case rai::message_type::bulk_req: - { - auto this_l (shared_from_this ()); - boost::asio::async_read (*socket, boost::asio::buffer (receive_buffer.data () + 1, sizeof (rai::uint256_union) + sizeof (rai::uint256_union)), [this_l] (boost::system::error_code const & ec, size_t size_a) - { - this_l->receive_bulk_req_action (ec, size_a); - }); - break; - } - case rai::message_type::frontier_req: + assert (size_a == 16); + rai::bufferstream type_stream (receive_buffer.data (), size_a); + uint8_t version_max; + uint8_t version_using; + uint8_t version_min; + rai::message_type type; + std::bitset <64> extensions; + if (!rai::message::read_header (type_stream, version_max, version_using, version_min, type, extensions)) + { + switch (type) { - auto this_l (shared_from_this ()); - boost::asio::async_read (*socket, boost::asio::buffer (receive_buffer.data () + 1, sizeof (rai::uint256_union) + sizeof (uint32_t) + sizeof (uint32_t)), [this_l] (boost::system::error_code const & ec, size_t size_a) - { - this_l->receive_frontier_req_action (ec, size_a); - }); - break; + case rai::message_type::bulk_req: + { + auto this_l (shared_from_this ()); + boost::asio::async_read (*socket, boost::asio::buffer (receive_buffer.data () + 16, sizeof (rai::uint256_union) + sizeof (rai::uint256_union)), [this_l] (boost::system::error_code const & ec, size_t size_a) + { + this_l->receive_bulk_req_action (ec, size_a); + }); + break; + } + case rai::message_type::frontier_req: + { + auto this_l (shared_from_this ()); + boost::asio::async_read (*socket, boost::asio::buffer (receive_buffer.data () + 16, sizeof (rai::uint256_union) + sizeof (uint32_t) + sizeof (uint32_t)), [this_l] (boost::system::error_code const & ec, size_t size_a) + { + this_l->receive_frontier_req_action (ec, size_a); + }); + break; + } + default: + { + if (network_logging ()) + { + client->log.add (boost::str (boost::format ("Received invalid type from bootstrap connection %1%") % static_cast (type))); + } + break; + } } - default: - { - if (network_logging ()) - { - client->log.add (boost::str (boost::format ("Received invalid type from bootstrap connection %1%") % static_cast (type))); - } - break; - } - } + } } else { @@ -2173,7 +2218,7 @@ void rai::bootstrap_connection::receive_bulk_req_action (boost::system::error_co if (!ec) { std::unique_ptr request (new rai::bulk_req); - rai::bufferstream stream (receive_buffer.data (), sizeof (rai::message_type) + sizeof (rai::uint256_union) + sizeof (rai::uint256_union)); + rai::bufferstream stream (receive_buffer.data (), 16 + sizeof (rai::uint256_union) + sizeof (rai::uint256_union)); auto error (request->deserialize (stream)); if (!error) { @@ -2192,7 +2237,7 @@ void rai::bootstrap_connection::receive_frontier_req_action (boost::system::erro if (!ec) { std::unique_ptr request (new rai::frontier_req); - rai::bufferstream stream (receive_buffer.data (), sizeof (rai::message_type) + sizeof (rai::uint256_union) + sizeof (uint32_t) + sizeof (uint32_t)); + rai::bufferstream stream (receive_buffer.data (), 16 + sizeof (rai::uint256_union) + sizeof (uint32_t) + sizeof (uint32_t)); auto error (request->deserialize (stream)); if (!error) { @@ -3135,8 +3180,14 @@ message (rai::message_type::frontier_req) bool rai::frontier_req::deserialize (rai::stream & stream_a) { - rai::message_type type; - auto result (read (stream_a, type)); + uint8_t version_max; + uint8_t version_using; + uint8_t version_min; + rai::message_type type; + std::bitset <64> extensions; + auto result (read_header (stream_a, version_max, version_using, version_min, type, extensions)); + assert (!result); + assert (rai::message_type::frontier_req == type); if (!result) { assert (type == rai::message_type::frontier_req); @@ -3155,7 +3206,7 @@ bool rai::frontier_req::deserialize (rai::stream & stream_a) void rai::frontier_req::serialize (rai::stream & stream_a) { - write (stream_a, rai::message_type::frontier_req); + write_header (stream_a); write (stream_a, start.bytes); write (stream_a, age); write (stream_a, count); @@ -3992,7 +4043,7 @@ void rai::message::write_header (rai::stream & stream_a) rai::write (stream_a, extensions.to_ullong ()); } -bool rai::message::read_header (rai::stream & stream_a, uint8_t & version_max_a, uint8_t & version_using_a, uint8_t & version_min_a, rai::message_type & type_a) +bool rai::message::read_header (rai::stream & stream_a, uint8_t & version_max_a, uint8_t & version_using_a, uint8_t & version_min_a, rai::message_type & type_a, std::bitset <64> & extensions_a) { uint32_t magic_number_l; auto result (rai::read (stream_a, magic_number_l)); @@ -4011,6 +4062,15 @@ bool rai::message::read_header (rai::stream & stream_a, uint8_t & version_max_a, if (!result) { result = rai::read (stream_a, type_a); + if (!result) + { + uint64_t extensions_l; + result = rai::read (stream_a, extensions_l); + if (!result) + { + extensions_a = extensions_l; + } + } } } } diff --git a/rai/core/core.hpp b/rai/core/core.hpp index b35e3d11..d99c899c 100644 --- a/rai/core/core.hpp +++ b/rai/core/core.hpp @@ -141,7 +141,7 @@ namespace rai { message (rai::message_type); virtual ~message () = default; void write_header (rai::stream &); - static bool read_header (rai::stream &, uint8_t &, uint8_t &, uint8_t &, rai::message_type &); + static bool read_header (rai::stream &, uint8_t &, uint8_t &, uint8_t &, rai::message_type &, std::bitset <64> &); virtual void serialize (rai::stream &) = 0; virtual bool deserialize (rai::stream &) = 0; virtual void visit (rai::message_visitor &) const = 0; @@ -503,7 +503,7 @@ namespace rai { bootstrap_connection (std::shared_ptr , std::shared_ptr ); ~bootstrap_connection (); void receive (); - void receive_type_action (boost::system::error_code const &, size_t); + void receive_header_action (boost::system::error_code const &, size_t); void receive_bulk_req_action (boost::system::error_code const &, size_t); void receive_frontier_req_action (boost::system::error_code const &, size_t); void add_request (std::unique_ptr ); diff --git a/rai/test/message.cpp b/rai/test/message.cpp index fbf739cf..d8071a7e 100644 --- a/rai/test/message.cpp +++ b/rai/test/message.cpp @@ -28,9 +28,10 @@ TEST (message, keepalive_deserialize) uint8_t version_max; uint8_t version_using; uint8_t version_min; - rai::message_type type; + rai::message_type type; + std::bitset <64> extensions; rai::bufferstream header_stream (bytes.data (), bytes.size ()); - ASSERT_FALSE (rai::message::read_header (header_stream, version_max, version_using, version_min, type)); + ASSERT_FALSE (rai::message::read_header (header_stream, version_max, version_using, version_min, type, extensions)); ASSERT_EQ (rai::message_type::keepalive, type); rai::keepalive message2; rai::bufferstream stream (bytes.data (), bytes.size ()); diff --git a/rai/test/network.cpp b/rai/test/network.cpp index 597efbd3..ea4511cc 100644 --- a/rai/test/network.cpp +++ b/rai/test/network.cpp @@ -31,8 +31,9 @@ TEST (publish, serialization) uint8_t version_max; uint8_t version_using; uint8_t version_min; - rai::message_type type; - ASSERT_FALSE (rai::message::read_header (stream, version_max, version_using, version_min, type)); + rai::message_type type; + std::bitset <64> extensions; + ASSERT_FALSE (rai::message::read_header (stream, version_max, version_using, version_min, type, extensions)); ASSERT_EQ (0x01, version_min); ASSERT_EQ (0x01, version_using); ASSERT_EQ (0x01, version_max); @@ -646,10 +647,13 @@ TEST (network, receive_weight_change) system.clients [1]->wallet.insert (key2.prv); system.clients [1]->representative = key2.pub; ASSERT_FALSE (system.clients [0]->transactions.send (key2.pub, 2)); + auto iterations (0); while (std::any_of (system.clients.begin (), system.clients.end (), [&] (std::shared_ptr const & client_a) {return client_a->ledger.weight (key2.pub) != 2;})) { system.service->poll_one (); system.processor.poll_one (); + ++iterations; + ASSERT_LT (iterations, 200); } }