Adding stuff for new header format.

This commit is contained in:
clemahieu 2014-10-30 22:53:45 -05:00
commit 2cd53b073e
4 changed files with 135 additions and 70 deletions

View file

@ -49,7 +49,7 @@ namespace
}
bool constexpr log_to_cerr ()
{
return true;
return false;
}
}
@ -232,7 +232,8 @@ void rai::network::receive_action (boost::system::error_code const & error, size
uint8_t version_using;
uint8_t version_min;
rai::message_type type;
if (!rai::message::read_header (header_stream, version_max, version_using, version_min, type))
std::bitset <64> extensions;
if (!rai::message::read_header (header_stream, version_max, version_using, version_min, type, extensions))
{
auto sender (remote);
maintain_keepalive (sender);
@ -409,9 +410,14 @@ block (std::move (block_a))
bool rai::publish::deserialize (rai::stream & stream_a)
{
rai::message_type type;
auto result (read (stream_a, type));
uint8_t version_max;
uint8_t version_using;
uint8_t version_min;
rai::message_type type;
std::bitset <64> extensions;
auto result (read_header (stream_a, version_max, version_using, version_min, type, extensions));
assert (!result);
assert (type == rai::message_type::publish);
if (!result)
{
result = read (stream_a, work);
@ -426,7 +432,7 @@ bool rai::publish::deserialize (rai::stream & stream_a)
void rai::publish::serialize (rai::stream & stream_a)
{
write (stream_a, rai::message_type::publish);
write_header (stream_a);
write (stream_a, work);
rai::serialize_block (stream_a, *block);
}
@ -1320,7 +1326,7 @@ void rai::keepalive::visit (rai::message_visitor & visitor_a) const
void rai::keepalive::serialize (rai::stream & stream_a)
{
write (stream_a, rai::message_type::keepalive);
write_header (stream_a);
for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i)
{
assert (i->address ().is_v6 ());
@ -1333,8 +1339,13 @@ void rai::keepalive::serialize (rai::stream & stream_a)
bool rai::keepalive::deserialize (rai::stream & stream_a)
{
rai::message_type type;
auto result (read (stream_a, type));
uint8_t version_max;
uint8_t version_using;
uint8_t version_min;
rai::message_type type;
std::bitset <64> extensions;
auto result (read_header (stream_a, version_max, version_using, version_min, type, extensions));
assert (!result);
assert (type == rai::message_type::keepalive);
for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i)
{
@ -1457,8 +1468,13 @@ message (rai::message_type::confirm_ack)
bool rai::confirm_ack::deserialize (rai::stream & stream_a)
{
rai::message_type type;
auto result (read (stream_a, type));
uint8_t version_max;
uint8_t version_using;
uint8_t version_min;
rai::message_type type;
std::bitset <64> extensions;
auto result (read_header (stream_a, version_max, version_using, version_min, type, extensions));
assert (!result);
assert (type == rai::message_type::confirm_ack);
if (!result)
{
@ -1482,7 +1498,7 @@ bool rai::confirm_ack::deserialize (rai::stream & stream_a)
void rai::confirm_ack::serialize (rai::stream & stream_a)
{
write (stream_a, rai::message_type::confirm_ack);
write_header (stream_a);
write (stream_a, vote.address);
write (stream_a, vote.signature);
write (stream_a, vote.sequence);
@ -1507,14 +1523,22 @@ message (rai::message_type::confirm_req)
bool rai::confirm_req::deserialize (rai::stream & stream_a)
{
rai::message_type type;
read (stream_a, type);
uint8_t version_max;
uint8_t version_using;
uint8_t version_min;
rai::message_type type;
std::bitset <64> extensions;
auto result (read_header (stream_a, version_max, version_using, version_min, type, extensions));
assert (!result);
assert (type == rai::message_type::confirm_req);
auto result (read (stream_a, work));
if (!result)
{
block = rai::deserialize_block (stream_a);
result = block == nullptr;
{
result = read (stream_a, work);
if (!result)
{
block = rai::deserialize_block (stream_a);
result = block == nullptr;
}
}
return result;
}
@ -1526,10 +1550,18 @@ message (rai::message_type::confirm_unk)
bool rai::confirm_unk::deserialize (rai::stream & stream_a)
{
rai::message_type type;
read (stream_a, type);
uint8_t version_max;
uint8_t version_using;
uint8_t version_min;
rai::message_type type;
std::bitset <64> extensions;
auto result (read_header (stream_a, version_max, version_using, version_min, type, extensions));
assert (!result);
assert (type == rai::message_type::confirm_unk);
auto result (read (stream_a, rep_hint));
if (!result)
{
result = read (stream_a, rep_hint);
}
return result;
}
@ -1546,7 +1578,7 @@ void rai::confirm_unk::visit (rai::message_visitor & visitor_a) const
void rai::confirm_req::serialize (rai::stream & stream_a)
{
assert (block != nullptr);
write (stream_a, rai::message_type::confirm_req);
write_header (stream_a);
write (stream_a, work);
rai::serialize_block (stream_a, *block);
}
@ -1799,6 +1831,7 @@ void rai::block_store::representation_put (rai::address const & address_a, rai::
void rai::confirm_unk::serialize (rai::stream & stream_a)
{
write_header (stream_a);
write (stream_a, rep_hint);
}
@ -1998,8 +2031,14 @@ void rai::bulk_req::visit (rai::message_visitor & visitor_a) const
bool rai::bulk_req::deserialize (rai::stream & stream_a)
{
rai::message_type type;
auto result (read (stream_a, type));
uint8_t version_max;
uint8_t version_using;
uint8_t version_min;
rai::message_type type;
std::bitset <64> extensions;
auto result (read_header (stream_a, version_max, version_using, version_min, type, extensions));
assert (!result);
assert (rai::message_type::bulk_req == type);
if (!result)
{
assert (type == rai::message_type::bulk_req);
@ -2014,7 +2053,7 @@ bool rai::bulk_req::deserialize (rai::stream & stream_a)
void rai::bulk_req::serialize (rai::stream & stream_a)
{
write (stream_a, rai::message_type::bulk_req);
write_header (stream_a);
write (stream_a, start);
write (stream_a, end);
}
@ -2115,49 +2154,55 @@ client (client_a)
void rai::bootstrap_connection::receive ()
{
auto this_l (shared_from_this ());
boost::asio::async_read (*socket, boost::asio::buffer (receive_buffer.data (), 1), [this_l] (boost::system::error_code const & ec, size_t size_a)
boost::asio::async_read (*socket, boost::asio::buffer (receive_buffer.data (), 16), [this_l] (boost::system::error_code const & ec, size_t size_a)
{
this_l->receive_type_action (ec, size_a);
this_l->receive_header_action (ec, size_a);
});
}
void rai::bootstrap_connection::receive_type_action (boost::system::error_code const & ec, size_t size_a)
void rai::bootstrap_connection::receive_header_action (boost::system::error_code const & ec, size_t size_a)
{
if (!ec)
{
assert (size_a == 1);
rai::bufferstream type_stream (receive_buffer.data (), size_a);
rai::message_type type;
read (type_stream, type);
switch (type)
{
case rai::message_type::bulk_req:
{
auto this_l (shared_from_this ());
boost::asio::async_read (*socket, boost::asio::buffer (receive_buffer.data () + 1, sizeof (rai::uint256_union) + sizeof (rai::uint256_union)), [this_l] (boost::system::error_code const & ec, size_t size_a)
{
this_l->receive_bulk_req_action (ec, size_a);
});
break;
}
case rai::message_type::frontier_req:
assert (size_a == 16);
rai::bufferstream type_stream (receive_buffer.data (), size_a);
uint8_t version_max;
uint8_t version_using;
uint8_t version_min;
rai::message_type type;
std::bitset <64> extensions;
if (!rai::message::read_header (type_stream, version_max, version_using, version_min, type, extensions))
{
switch (type)
{
auto this_l (shared_from_this ());
boost::asio::async_read (*socket, boost::asio::buffer (receive_buffer.data () + 1, sizeof (rai::uint256_union) + sizeof (uint32_t) + sizeof (uint32_t)), [this_l] (boost::system::error_code const & ec, size_t size_a)
{
this_l->receive_frontier_req_action (ec, size_a);
});
break;
case rai::message_type::bulk_req:
{
auto this_l (shared_from_this ());
boost::asio::async_read (*socket, boost::asio::buffer (receive_buffer.data () + 16, sizeof (rai::uint256_union) + sizeof (rai::uint256_union)), [this_l] (boost::system::error_code const & ec, size_t size_a)
{
this_l->receive_bulk_req_action (ec, size_a);
});
break;
}
case rai::message_type::frontier_req:
{
auto this_l (shared_from_this ());
boost::asio::async_read (*socket, boost::asio::buffer (receive_buffer.data () + 16, sizeof (rai::uint256_union) + sizeof (uint32_t) + sizeof (uint32_t)), [this_l] (boost::system::error_code const & ec, size_t size_a)
{
this_l->receive_frontier_req_action (ec, size_a);
});
break;
}
default:
{
if (network_logging ())
{
client->log.add (boost::str (boost::format ("Received invalid type from bootstrap connection %1%") % static_cast <uint8_t> (type)));
}
break;
}
}
default:
{
if (network_logging ())
{
client->log.add (boost::str (boost::format ("Received invalid type from bootstrap connection %1%") % static_cast <uint8_t> (type)));
}
break;
}
}
}
}
else
{
@ -2173,7 +2218,7 @@ void rai::bootstrap_connection::receive_bulk_req_action (boost::system::error_co
if (!ec)
{
std::unique_ptr <rai::bulk_req> request (new rai::bulk_req);
rai::bufferstream stream (receive_buffer.data (), sizeof (rai::message_type) + sizeof (rai::uint256_union) + sizeof (rai::uint256_union));
rai::bufferstream stream (receive_buffer.data (), 16 + sizeof (rai::uint256_union) + sizeof (rai::uint256_union));
auto error (request->deserialize (stream));
if (!error)
{
@ -2192,7 +2237,7 @@ void rai::bootstrap_connection::receive_frontier_req_action (boost::system::erro
if (!ec)
{
std::unique_ptr <rai::frontier_req> request (new rai::frontier_req);
rai::bufferstream stream (receive_buffer.data (), sizeof (rai::message_type) + sizeof (rai::uint256_union) + sizeof (uint32_t) + sizeof (uint32_t));
rai::bufferstream stream (receive_buffer.data (), 16 + sizeof (rai::uint256_union) + sizeof (uint32_t) + sizeof (uint32_t));
auto error (request->deserialize (stream));
if (!error)
{
@ -3135,8 +3180,14 @@ message (rai::message_type::frontier_req)
bool rai::frontier_req::deserialize (rai::stream & stream_a)
{
rai::message_type type;
auto result (read (stream_a, type));
uint8_t version_max;
uint8_t version_using;
uint8_t version_min;
rai::message_type type;
std::bitset <64> extensions;
auto result (read_header (stream_a, version_max, version_using, version_min, type, extensions));
assert (!result);
assert (rai::message_type::frontier_req == type);
if (!result)
{
assert (type == rai::message_type::frontier_req);
@ -3155,7 +3206,7 @@ bool rai::frontier_req::deserialize (rai::stream & stream_a)
void rai::frontier_req::serialize (rai::stream & stream_a)
{
write (stream_a, rai::message_type::frontier_req);
write_header (stream_a);
write (stream_a, start.bytes);
write (stream_a, age);
write (stream_a, count);
@ -3992,7 +4043,7 @@ void rai::message::write_header (rai::stream & stream_a)
rai::write (stream_a, extensions.to_ullong ());
}
bool rai::message::read_header (rai::stream & stream_a, uint8_t & version_max_a, uint8_t & version_using_a, uint8_t & version_min_a, rai::message_type & type_a)
bool rai::message::read_header (rai::stream & stream_a, uint8_t & version_max_a, uint8_t & version_using_a, uint8_t & version_min_a, rai::message_type & type_a, std::bitset <64> & extensions_a)
{
uint32_t magic_number_l;
auto result (rai::read (stream_a, magic_number_l));
@ -4011,6 +4062,15 @@ bool rai::message::read_header (rai::stream & stream_a, uint8_t & version_max_a,
if (!result)
{
result = rai::read (stream_a, type_a);
if (!result)
{
uint64_t extensions_l;
result = rai::read (stream_a, extensions_l);
if (!result)
{
extensions_a = extensions_l;
}
}
}
}
}

View file

@ -141,7 +141,7 @@ namespace rai {
message (rai::message_type);
virtual ~message () = default;
void write_header (rai::stream &);
static bool read_header (rai::stream &, uint8_t &, uint8_t &, uint8_t &, rai::message_type &);
static bool read_header (rai::stream &, uint8_t &, uint8_t &, uint8_t &, rai::message_type &, std::bitset <64> &);
virtual void serialize (rai::stream &) = 0;
virtual bool deserialize (rai::stream &) = 0;
virtual void visit (rai::message_visitor &) const = 0;
@ -503,7 +503,7 @@ namespace rai {
bootstrap_connection (std::shared_ptr <boost::asio::ip::tcp::socket>, std::shared_ptr <rai::client>);
~bootstrap_connection ();
void receive ();
void receive_type_action (boost::system::error_code const &, size_t);
void receive_header_action (boost::system::error_code const &, size_t);
void receive_bulk_req_action (boost::system::error_code const &, size_t);
void receive_frontier_req_action (boost::system::error_code const &, size_t);
void add_request (std::unique_ptr <rai::message>);

View file

@ -28,9 +28,10 @@ TEST (message, keepalive_deserialize)
uint8_t version_max;
uint8_t version_using;
uint8_t version_min;
rai::message_type type;
rai::message_type type;
std::bitset <64> extensions;
rai::bufferstream header_stream (bytes.data (), bytes.size ());
ASSERT_FALSE (rai::message::read_header (header_stream, version_max, version_using, version_min, type));
ASSERT_FALSE (rai::message::read_header (header_stream, version_max, version_using, version_min, type, extensions));
ASSERT_EQ (rai::message_type::keepalive, type);
rai::keepalive message2;
rai::bufferstream stream (bytes.data (), bytes.size ());

View file

@ -31,8 +31,9 @@ TEST (publish, serialization)
uint8_t version_max;
uint8_t version_using;
uint8_t version_min;
rai::message_type type;
ASSERT_FALSE (rai::message::read_header (stream, version_max, version_using, version_min, type));
rai::message_type type;
std::bitset <64> extensions;
ASSERT_FALSE (rai::message::read_header (stream, version_max, version_using, version_min, type, extensions));
ASSERT_EQ (0x01, version_min);
ASSERT_EQ (0x01, version_using);
ASSERT_EQ (0x01, version_max);
@ -646,10 +647,13 @@ TEST (network, receive_weight_change)
system.clients [1]->wallet.insert (key2.prv);
system.clients [1]->representative = key2.pub;
ASSERT_FALSE (system.clients [0]->transactions.send (key2.pub, 2));
auto iterations (0);
while (std::any_of (system.clients.begin (), system.clients.end (), [&] (std::shared_ptr <rai::client> const & client_a) {return client_a->ledger.weight (key2.pub) != 2;}))
{
system.service->poll_one ();
system.processor.poll_one ();
++iterations;
ASSERT_LT (iterations, 200);
}
}