Unifying keepalive.

This commit is contained in:
clemahieu 2014-10-27 20:05:50 -05:00
commit aeebb43d95
4 changed files with 49 additions and 162 deletions

View file

@ -93,8 +93,7 @@ socket (service_a, boost::asio::ip::udp::endpoint (boost::asio::ip::address_v4::
service (service_a),
resolver (service_a),
client (client_a),
keepalive_req_count (0),
keepalive_ack_count (0),
keepalive_count (0),
publish_req_count (0),
confirm_req_count (0),
confirm_ack_count (0),
@ -128,8 +127,9 @@ void rai::network::maintain_keepalive (boost::asio::ip::udp::endpoint const & en
{
if (!client.peers.contacting_peer (endpoint_a) && endpoint_a != endpoint ())
{
rai::keepalive_req message;
rai::keepalive message;
client.peers.random_fill (message.peers);
message.checksum = client.ledger.checksum (0, std::numeric_limits <rai::uint256_t>::max ());
std::shared_ptr <std::vector <uint8_t>> bytes (new std::vector <uint8_t>);
{
rai::vectorstream stream (*bytes);
@ -228,32 +228,15 @@ void rai::network::receive_action (boost::system::error_code const & error, size
read (type_stream, type);
switch (type)
{
case rai::message_type::keepalive_req:
case rai::message_type::keepalive:
{
rai::keepalive_req incoming;
rai::keepalive incoming;
rai::bufferstream stream (buffer.data (), size_a);
auto error (incoming.deserialize (stream));
receive ();
if (!error)
{
++keepalive_req_count;
client.processor.process_message (incoming, sender);
}
else
{
++error_count;
}
break;
}
case rai::message_type::keepalive_ack:
{
rai::keepalive_ack incoming;
rai::bufferstream stream (buffer.data (), size_a);
auto error (incoming.deserialize (stream));
receive ();
if (!error)
{
++keepalive_ack_count;
++keepalive_count;
client.processor.process_message (incoming, sender);
}
else
@ -1291,14 +1274,9 @@ std::vector <rai::peer_information> rai::peer_container::list ()
return result;
}
void rai::keepalive_req::visit (rai::message_visitor & visitor_a) const
void rai::keepalive::visit (rai::message_visitor & visitor_a) const
{
visitor_a.keepalive_req (*this);
}
void rai::keepalive_ack::visit (rai::message_visitor & visitor_a) const
{
visitor_a.keepalive_ack (*this);
visitor_a.keepalive (*this);
}
void rai::publish::visit (rai::message_visitor & visitor_a) const
@ -1306,9 +1284,9 @@ void rai::publish::visit (rai::message_visitor & visitor_a) const
visitor_a.publish (*this);
}
void rai::keepalive_ack::serialize (rai::stream & stream_a)
void rai::keepalive::serialize (rai::stream & stream_a)
{
write (stream_a, rai::message_type::keepalive_ack);
write (stream_a, rai::message_type::keepalive);
for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i)
{
uint32_t address (i->address ().to_v4 ().to_ulong ());
@ -1318,11 +1296,11 @@ void rai::keepalive_ack::serialize (rai::stream & stream_a)
write (stream_a, checksum);
}
bool rai::keepalive_ack::deserialize (rai::stream & stream_a)
bool rai::keepalive::deserialize (rai::stream & stream_a)
{
rai::message_type type;
auto result (read (stream_a, type));
assert (type == rai::message_type::keepalive_ack);
assert (type == rai::message_type::keepalive);
for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i)
{
uint32_t address;
@ -1335,33 +1313,6 @@ bool rai::keepalive_ack::deserialize (rai::stream & stream_a)
return result;
}
void rai::keepalive_req::serialize (rai::stream & stream_a)
{
write (stream_a, rai::message_type::keepalive_req);
for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i)
{
uint32_t address (i->address ().to_v4 ().to_ulong ());
write (stream_a, address);
write (stream_a, i->port ());
}
}
bool rai::keepalive_req::deserialize (rai::stream & stream_a)
{
rai::message_type type;
auto result (read (stream_a, type));
assert (type == rai::message_type::keepalive_req);
for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i)
{
uint32_t address;
uint16_t port;
read (stream_a, address);
read (stream_a, port);
*i = rai::endpoint (boost::asio::ip::address_v4 (address), port);
}
return result;
}
size_t rai::processor_service::size ()
{
std::lock_guard <std::mutex> lock (mutex);
@ -2237,36 +2188,32 @@ public:
connection (connection_a)
{
}
void keepalive_req (rai::keepalive_req const &)
void keepalive (rai::keepalive const &) override
{
assert (false);
}
void keepalive_ack (rai::keepalive_ack const &)
void publish (rai::publish const &) override
{
assert (false);
}
void publish (rai::publish const &)
void confirm_req (rai::confirm_req const &) override
{
assert (false);
}
void confirm_req (rai::confirm_req const &)
void confirm_ack (rai::confirm_ack const &) override
{
assert (false);
}
void confirm_ack (rai::confirm_ack const &)
void confirm_unk (rai::confirm_unk const &) override
{
assert (false);
}
void confirm_unk (rai::confirm_unk const &)
{
assert (false);
}
void bulk_req (rai::bulk_req const &)
void bulk_req (rai::bulk_req const &) override
{
auto response (std::make_shared <rai::bulk_req_response> (connection, std::unique_ptr <rai::bulk_req> (static_cast <rai::bulk_req *> (connection->requests.front ().release ()))));
response->send_next ();
}
void frontier_req (rai::frontier_req const &)
void frontier_req (rai::frontier_req const &) override
{
auto response (std::make_shared <rai::frontier_req_response> (connection, std::unique_ptr <rai::frontier_req> (static_cast <rai::frontier_req *> (connection->requests.front ().release ()))));
response->send_next ();
@ -2413,36 +2360,32 @@ public:
connection (connection_a)
{
}
void keepalive_req (rai::keepalive_req const &)
void keepalive (rai::keepalive const &) override
{
assert (false);
}
void keepalive_ack (rai::keepalive_ack const &)
void publish (rai::publish const &) override
{
assert (false);
}
void publish (rai::publish const &)
void confirm_req (rai::confirm_req const &) override
{
assert (false);
}
void confirm_req (rai::confirm_req const &)
void confirm_ack (rai::confirm_ack const &) override
{
assert (false);
}
void confirm_ack (rai::confirm_ack const &)
void confirm_unk (rai::confirm_unk const &) override
{
assert (false);
}
void confirm_unk (rai::confirm_unk const &)
{
assert (false);
}
void bulk_req (rai::bulk_req const &)
void bulk_req (rai::bulk_req const &) override
{
auto response (std::make_shared <rai::bulk_req_initiator> (connection, std::unique_ptr <rai::bulk_req> (static_cast <rai::bulk_req *> (connection->requests.front ().release ()))));
response->receive_block ();
}
void frontier_req (rai::frontier_req const &)
void frontier_req (rai::frontier_req const &) override
{
auto response (std::make_shared <rai::frontier_req_initiator> (connection, std::unique_ptr <rai::frontier_req> (static_cast <rai::frontier_req *> (connection->requests.front ().release ()))));
response->receive_frontier ();
@ -3285,7 +3228,7 @@ void rai::block_store::checksum_del (uint64_t prefix, uint8_t mask)
checksum->Delete (leveldb::WriteOptions (), leveldb::Slice (reinterpret_cast <char const *> (&key), sizeof (uint64_t)));
}
bool rai::keepalive_ack::operator == (rai::keepalive_ack const & other_a) const
bool rai::keepalive::operator == (rai::keepalive const & other_a) const
{
return (peers == other_a.peers) && (checksum == other_a.checksum);
}
@ -3598,42 +3541,11 @@ public:
sender (sender_a)
{
}
void keepalive_req (rai::keepalive_req const & message_a) override
void keepalive (rai::keepalive const & message_a) override
{
if (network_keepalive_logging ())
{
client.log.add (boost::str (boost::format ("Received keepalive req from %1%") % sender));
}
rai::keepalive_ack ack_message;
client.peers.random_fill (ack_message.peers);
ack_message.checksum = client.ledger.checksum (0, std::numeric_limits <rai::uint256_t>::max ());
std::shared_ptr <std::vector <uint8_t>> ack_bytes (new std::vector <uint8_t>);
{
rai::vectorstream stream (*ack_bytes);
ack_message.serialize (stream);
}
auto client_l (client.shared ());
client.network.send_buffer (ack_bytes->data (), ack_bytes->size (), sender, [ack_bytes, client_l] (boost::system::error_code const & error, size_t size_a)
{
if (network_logging ())
{
if (error)
{
client_l->log.add (boost::str (boost::format ("Error sending keepalive ack: %1%") % error.message ()));
}
}
});
client.network.merge_peers (message_a.peers);
if (network_keepalive_logging ())
{
client.log.add (boost::str (boost::format ("Sending keepalive ack to %1%") % sender));
}
}
void keepalive_ack (rai::keepalive_ack const & message_a) override
{
if (network_keepalive_logging ())
{
client.log.add (boost::str (boost::format ("Received keepalive ack from %1%") % sender));
client.log.add (boost::str (boost::format ("Received keepalive from %1%") % sender));
}
client.network.merge_peers (message_a.peers);
if (message_a.checksum != client.ledger.checksum (0, std::numeric_limits <rai::uint256_t>::max ()))

View file

@ -120,8 +120,7 @@ namespace rai {
{
invalid,
not_a_type,
keepalive_req,
keepalive_ack,
keepalive,
publish,
confirm_req,
confirm_ack,
@ -137,21 +136,13 @@ namespace rai {
virtual void serialize (rai::stream &) = 0;
virtual void visit (rai::message_visitor &) const = 0;
};
class keepalive_req : public message
class keepalive : public message
{
public:
void visit (rai::message_visitor &) const override;
bool deserialize (rai::stream &);
void serialize (rai::stream &) override;
std::array <rai::endpoint, 24> peers;
};
class keepalive_ack : public message
{
public:
void visit (rai::message_visitor &) const override;
bool deserialize (rai::stream &);
void serialize (rai::stream &) override;
bool operator == (rai::keepalive_ack const &) const;
bool operator == (rai::keepalive const &) const;
std::array <rai::endpoint, 24> peers;
rai::uint256_union checksum;
};
@ -220,8 +211,7 @@ namespace rai {
class message_visitor
{
public:
virtual void keepalive_req (rai::keepalive_req const &) = 0;
virtual void keepalive_ack (rai::keepalive_ack const &) = 0;
virtual void keepalive (rai::keepalive const &) = 0;
virtual void publish (rai::publish const &) = 0;
virtual void confirm_req (rai::confirm_req const &) = 0;
virtual void confirm_ack (rai::confirm_ack const &) = 0;
@ -457,8 +447,7 @@ namespace rai {
rai::client & client;
std::queue <std::tuple <uint8_t const *, size_t, rai::endpoint, std::function <void (boost::system::error_code const &, size_t)>>> sends;
std::mutex mutex;
uint64_t keepalive_req_count;
uint64_t keepalive_ack_count;
uint64_t keepalive_count;
uint64_t publish_req_count;
uint64_t confirm_req_count;
uint64_t confirm_ack_count;

View file

@ -440,13 +440,13 @@ TEST (frontier_req, serialization)
TEST (keepalive_ack, serialization)
{
rai::keepalive_ack request1;
rai::keepalive request1;
std::vector <uint8_t> bytes;
{
rai::vectorstream stream (bytes);
request1.serialize (stream);
}
rai::keepalive_ack request2;
rai::keepalive request2;
rai::bufferstream buffer (bytes.data (), bytes.size ());
ASSERT_FALSE (request2.deserialize (buffer));
ASSERT_EQ (request1, request2);

View file

@ -63,33 +63,18 @@ TEST (network, self_discard)
ASSERT_EQ (1, system.clients [0]->network.bad_sender_count);
}
TEST (keepalive_req, deserialize)
{
rai::keepalive_req message1;
rai::endpoint endpoint (boost::asio::ip::address_v4 (0x7f000001), 10000);
message1.peers [0] = endpoint;
std::vector <uint8_t> bytes;
{
rai::vectorstream stream (bytes);
message1.serialize (stream);
}
rai::keepalive_req message2;
rai::bufferstream stream (bytes.data (), bytes.size ());
ASSERT_FALSE (message2.deserialize (stream));
ASSERT_EQ (message1.peers, message2.peers);
}
TEST (keepalive_ack, deserialize)
{
rai::keepalive_ack message1;
rai::keepalive message1;
rai::endpoint endpoint (boost::asio::ip::address_v4 (0x7f000001), 10000);
message1.peers [0] = endpoint;
message1.checksum = 1;
std::vector <uint8_t> bytes;
{
rai::vectorstream stream (bytes);
message1.serialize (stream);
}
rai::keepalive_ack message2;
rai::keepalive message2;
rai::bufferstream stream (bytes.data (), bytes.size ());
ASSERT_FALSE (message2.deserialize (stream));
ASSERT_EQ (message1.peers, message2.peers);
@ -97,22 +82,23 @@ TEST (keepalive_ack, deserialize)
TEST (network, send_keepalive)
{
rai::system system (24000, 2);
rai::system system (24000, 1);
auto list1 (system.clients [0]->peers.list ());
ASSERT_EQ (1, list1.size ());
while (list1 [0].last_contact == std::chrono::system_clock::now ());
system.clients [0]->network.maintain_keepalive (system.clients [1]->network.endpoint ());
auto initial (system.clients [0]->network.keepalive_ack_count);
while (system.clients [0]->network.keepalive_ack_count == initial)
ASSERT_EQ (0, list1.size ());
rai::client_init init1;
auto client1 (std::make_shared <rai::client> (init1, system.service, 24001, system.processor, rai::test_genesis_key.pub));
client1->start ();
system.clients [0]->network.maintain_keepalive (client1->network.endpoint ());
auto initial (system.clients [0]->network.keepalive_count);
while (system.clients [0]->network.keepalive_count == initial)
{
system.service->run_one ();
}
auto peers1 (system.clients [0]->peers.list ());
auto peers2 (system.clients [1]->peers.list ());
auto peers2 (client1->peers.list ());
ASSERT_EQ (1, peers1.size ());
ASSERT_EQ (1, peers2.size ());
ASSERT_NE (peers1.end (), std::find_if (peers1.begin (), peers1.end (), [&system] (rai::peer_information const & information_a) {return information_a.endpoint == system.clients [1]->network.endpoint ();}));
ASSERT_GT (peers1 [0].last_contact, list1 [0].last_contact);
ASSERT_NE (peers1.end (), std::find_if (peers1.begin (), peers1.end (), [&client1] (rai::peer_information const & information_a) {return information_a.endpoint == client1->network.endpoint ();}));
ASSERT_NE (peers2.end (), std::find_if (peers2.begin (), peers2.end (), [&system] (rai::peer_information const & information_a) {return information_a.endpoint == system.clients [0]->network.endpoint ();}));
}