Improved error handling in message parser (#704)
This commit is contained in:
parent
650a1e6139
commit
003779446f
4 changed files with 114 additions and 49 deletions
|
@ -74,14 +74,14 @@ TEST (message_parser, exact_confirm_ack_size)
|
|||
message.serialize (stream);
|
||||
}
|
||||
ASSERT_EQ (0, visitor.confirm_ack_count);
|
||||
ASSERT_FALSE (parser.error);
|
||||
ASSERT_EQ (parser.status, rai::message_parser::parse_status::success);
|
||||
parser.deserialize_confirm_ack (bytes.data (), bytes.size ());
|
||||
ASSERT_EQ (1, visitor.confirm_ack_count);
|
||||
ASSERT_FALSE (parser.error);
|
||||
ASSERT_EQ (parser.status, rai::message_parser::parse_status::success);
|
||||
bytes.push_back (0);
|
||||
parser.deserialize_confirm_ack (bytes.data (), bytes.size ());
|
||||
ASSERT_EQ (1, visitor.confirm_ack_count);
|
||||
ASSERT_TRUE (parser.error);
|
||||
ASSERT_NE (parser.status, rai::message_parser::parse_status::success);
|
||||
}
|
||||
|
||||
TEST (message_parser, exact_confirm_req_size)
|
||||
|
@ -97,14 +97,14 @@ TEST (message_parser, exact_confirm_req_size)
|
|||
message.serialize (stream);
|
||||
}
|
||||
ASSERT_EQ (0, visitor.confirm_req_count);
|
||||
ASSERT_FALSE (parser.error);
|
||||
ASSERT_EQ (parser.status, rai::message_parser::parse_status::success);
|
||||
parser.deserialize_confirm_req (bytes.data (), bytes.size ());
|
||||
ASSERT_EQ (1, visitor.confirm_req_count);
|
||||
ASSERT_FALSE (parser.error);
|
||||
ASSERT_EQ (parser.status, rai::message_parser::parse_status::success);
|
||||
bytes.push_back (0);
|
||||
parser.deserialize_confirm_req (bytes.data (), bytes.size ());
|
||||
ASSERT_EQ (1, visitor.confirm_req_count);
|
||||
ASSERT_TRUE (parser.error);
|
||||
ASSERT_NE (parser.status, rai::message_parser::parse_status::success);
|
||||
}
|
||||
|
||||
TEST (message_parser, exact_publish_size)
|
||||
|
@ -120,14 +120,14 @@ TEST (message_parser, exact_publish_size)
|
|||
message.serialize (stream);
|
||||
}
|
||||
ASSERT_EQ (0, visitor.publish_count);
|
||||
ASSERT_FALSE (parser.error);
|
||||
ASSERT_EQ (parser.status, rai::message_parser::parse_status::success);
|
||||
parser.deserialize_publish (bytes.data (), bytes.size ());
|
||||
ASSERT_EQ (1, visitor.publish_count);
|
||||
ASSERT_FALSE (parser.error);
|
||||
ASSERT_EQ (parser.status, rai::message_parser::parse_status::success);
|
||||
bytes.push_back (0);
|
||||
parser.deserialize_publish (bytes.data (), bytes.size ());
|
||||
ASSERT_EQ (1, visitor.publish_count);
|
||||
ASSERT_TRUE (parser.error);
|
||||
ASSERT_NE (parser.status, rai::message_parser::parse_status::success);
|
||||
}
|
||||
|
||||
TEST (message_parser, exact_keepalive_size)
|
||||
|
@ -142,12 +142,12 @@ TEST (message_parser, exact_keepalive_size)
|
|||
message.serialize (stream);
|
||||
}
|
||||
ASSERT_EQ (0, visitor.keepalive_count);
|
||||
ASSERT_FALSE (parser.error);
|
||||
ASSERT_EQ (parser.status, rai::message_parser::parse_status::success);
|
||||
parser.deserialize_keepalive (bytes.data (), bytes.size ());
|
||||
ASSERT_EQ (1, visitor.keepalive_count);
|
||||
ASSERT_FALSE (parser.error);
|
||||
ASSERT_EQ (parser.status, rai::message_parser::parse_status::success);
|
||||
bytes.push_back (0);
|
||||
parser.deserialize_keepalive (bytes.data (), bytes.size ());
|
||||
ASSERT_EQ (1, visitor.keepalive_count);
|
||||
ASSERT_TRUE (parser.error);
|
||||
ASSERT_NE (parser.status, rai::message_parser::parse_status::success);
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
|
||||
#include <rai/node/common.hpp>
|
||||
|
||||
#include <rai/lib/work.hpp>
|
||||
|
@ -73,14 +74,13 @@ bool rai::message::read_header (rai::stream & stream_a, uint8_t & version_max_a,
|
|||
rai::message_parser::message_parser (rai::message_visitor & visitor_a, rai::work_pool & pool_a) :
|
||||
visitor (visitor_a),
|
||||
pool (pool_a),
|
||||
error (false),
|
||||
insufficient_work (false)
|
||||
status (parse_status::success)
|
||||
{
|
||||
}
|
||||
|
||||
void rai::message_parser::deserialize_buffer (uint8_t const * buffer_a, size_t size_a)
|
||||
{
|
||||
error = false;
|
||||
status = parse_status::success;
|
||||
rai::bufferstream header_stream (buffer_a, size_a);
|
||||
uint8_t version_max;
|
||||
uint8_t version_using;
|
||||
|
@ -113,14 +113,14 @@ void rai::message_parser::deserialize_buffer (uint8_t const * buffer_a, size_t s
|
|||
}
|
||||
default:
|
||||
{
|
||||
error = true;
|
||||
status = parse_status::invalid_message_type;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
error = true;
|
||||
status = parse_status::invalid_header;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -135,7 +135,7 @@ void rai::message_parser::deserialize_keepalive (uint8_t const * buffer_a, size_
|
|||
}
|
||||
else
|
||||
{
|
||||
error = true;
|
||||
status = parse_status::invalid_keepalive_message;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -152,12 +152,12 @@ void rai::message_parser::deserialize_publish (uint8_t const * buffer_a, size_t
|
|||
}
|
||||
else
|
||||
{
|
||||
insufficient_work = true;
|
||||
status = parse_status::insufficient_work;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
error = true;
|
||||
status = parse_status::invalid_publish_message;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -174,12 +174,12 @@ void rai::message_parser::deserialize_confirm_req (uint8_t const * buffer_a, siz
|
|||
}
|
||||
else
|
||||
{
|
||||
insufficient_work = true;
|
||||
status = parse_status::insufficient_work;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
error = true;
|
||||
status = parse_status::invalid_confirm_req_message;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -196,12 +196,12 @@ void rai::message_parser::deserialize_confirm_ack (uint8_t const * buffer_a, siz
|
|||
}
|
||||
else
|
||||
{
|
||||
insufficient_work = true;
|
||||
status = parse_status::insufficient_work;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
error = true;
|
||||
status = parse_status::invalid_confirm_ack_message;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -241,18 +241,23 @@ void rai::keepalive::serialize (rai::stream & stream_a)
|
|||
|
||||
bool rai::keepalive::deserialize (rai::stream & stream_a)
|
||||
{
|
||||
auto result (read_header (stream_a, version_max, version_using, version_min, type, extensions));
|
||||
assert (!result);
|
||||
auto error (read_header (stream_a, version_max, version_using, version_min, type, extensions));
|
||||
assert (!error);
|
||||
assert (type == rai::message_type::keepalive);
|
||||
for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i)
|
||||
for (auto i (peers.begin ()), j (peers.end ()); i != j && !error; ++i)
|
||||
{
|
||||
std::array<uint8_t, 16> address;
|
||||
uint16_t port;
|
||||
read (stream_a, address);
|
||||
read (stream_a, port);
|
||||
*i = rai::endpoint (boost::asio::ip::address_v6 (address), port);
|
||||
if (!read (stream_a, address) && !read (stream_a, port))
|
||||
{
|
||||
*i = rai::endpoint (boost::asio::ip::address_v6 (address), port);
|
||||
}
|
||||
else
|
||||
{
|
||||
error = true;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
return error;
|
||||
}
|
||||
|
||||
bool rai::keepalive::operator== (rai::keepalive const & other_a) const
|
||||
|
|
|
@ -130,6 +130,17 @@ class work_pool;
|
|||
class message_parser
|
||||
{
|
||||
public:
|
||||
enum class parse_status
|
||||
{
|
||||
success,
|
||||
insufficient_work,
|
||||
invalid_header,
|
||||
invalid_message_type,
|
||||
invalid_keepalive_message,
|
||||
invalid_publish_message,
|
||||
invalid_confirm_req_message,
|
||||
invalid_confirm_ack_message
|
||||
};
|
||||
message_parser (rai::message_visitor &, rai::work_pool &);
|
||||
void deserialize_buffer (uint8_t const *, size_t);
|
||||
void deserialize_keepalive (uint8_t const *, size_t);
|
||||
|
@ -139,8 +150,7 @@ public:
|
|||
bool at_end (rai::bufferstream &);
|
||||
rai::message_visitor & visitor;
|
||||
rai::work_pool & pool;
|
||||
bool error;
|
||||
bool insufficient_work;
|
||||
parse_status status;
|
||||
};
|
||||
class keepalive : public message
|
||||
{
|
||||
|
|
|
@ -419,17 +419,65 @@ void rai::network::receive_action (boost::system::error_code const & error, size
|
|||
network_message_visitor visitor (node, remote);
|
||||
rai::message_parser parser (visitor, node.work);
|
||||
parser.deserialize_buffer (buffer.data (), size_a);
|
||||
if (parser.error)
|
||||
if (parser.status != rai::message_parser::parse_status::success)
|
||||
{
|
||||
++error_count;
|
||||
}
|
||||
else if (parser.insufficient_work)
|
||||
{
|
||||
if (node.config.logging.insufficient_work_logging ())
|
||||
|
||||
if (parser.status == rai::message_parser::parse_status::insufficient_work)
|
||||
{
|
||||
BOOST_LOG (node.log) << "Insufficient work in message";
|
||||
if (node.config.logging.insufficient_work_logging ())
|
||||
{
|
||||
BOOST_LOG (node.log) << "Insufficient work in message";
|
||||
}
|
||||
|
||||
++insufficient_work_count;
|
||||
}
|
||||
else if (parser.status == rai::message_parser::parse_status::invalid_message_type)
|
||||
{
|
||||
if (node.config.logging.network_logging ())
|
||||
{
|
||||
BOOST_LOG (node.log) << "Invalid message type in message";
|
||||
}
|
||||
}
|
||||
else if (parser.status == rai::message_parser::parse_status::invalid_header)
|
||||
{
|
||||
if (node.config.logging.network_logging ())
|
||||
{
|
||||
BOOST_LOG (node.log) << "Invalid header in message";
|
||||
}
|
||||
}
|
||||
else if (parser.status == rai::message_parser::parse_status::invalid_keepalive_message)
|
||||
{
|
||||
if (node.config.logging.network_logging ())
|
||||
{
|
||||
BOOST_LOG (node.log) << "Invalid keepalive message";
|
||||
}
|
||||
}
|
||||
else if (parser.status == rai::message_parser::parse_status::invalid_publish_message)
|
||||
{
|
||||
if (node.config.logging.network_logging ())
|
||||
{
|
||||
BOOST_LOG (node.log) << "Invalid publish message";
|
||||
}
|
||||
}
|
||||
else if (parser.status == rai::message_parser::parse_status::invalid_confirm_req_message)
|
||||
{
|
||||
if (node.config.logging.network_logging ())
|
||||
{
|
||||
BOOST_LOG (node.log) << "Invalid confirm_req message";
|
||||
}
|
||||
}
|
||||
else if (parser.status == rai::message_parser::parse_status::invalid_confirm_ack_message)
|
||||
{
|
||||
if (node.config.logging.network_logging ())
|
||||
{
|
||||
BOOST_LOG (node.log) << "Invalid confirm_ack message";
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
BOOST_LOG (node.log) << "Could not deserialize buffer";
|
||||
}
|
||||
++insufficient_work_count;
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -2509,16 +2557,18 @@ void rai::peer_container::rep_request (rai::endpoint const & endpoint_a)
|
|||
|
||||
bool rai::peer_container::reachout (rai::endpoint const & endpoint_a)
|
||||
{
|
||||
auto result (false);
|
||||
// Don't contact invalid IPs
|
||||
result |= not_a_peer (endpoint_a);
|
||||
// Don't keepalive to nodes that already sent us something
|
||||
result |= known_peer (endpoint_a);
|
||||
std::lock_guard<std::mutex> lock (mutex);
|
||||
auto existing (attempts.find (endpoint_a));
|
||||
result |= existing != attempts.end ();
|
||||
attempts.insert ({ endpoint_a, std::chrono::steady_clock::now () });
|
||||
return result;
|
||||
bool error = not_a_peer (endpoint_a);
|
||||
if (!error)
|
||||
{
|
||||
// Don't keepalive to nodes that already sent us something
|
||||
error |= known_peer (endpoint_a);
|
||||
std::lock_guard<std::mutex> lock (mutex);
|
||||
auto existing (attempts.find (endpoint_a));
|
||||
error |= existing != attempts.end ();
|
||||
attempts.insert ({ endpoint_a, std::chrono::steady_clock::now () });
|
||||
}
|
||||
return error;
|
||||
}
|
||||
|
||||
bool rai::peer_container::insert (rai::endpoint const & endpoint_a, unsigned version_a)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue