Add is_originator flag to publish messages (#4654)
* Publish message `originator` flag * Set and use the `is_originator` flag when publishing and processing blocks * Cleanup
This commit is contained in:
parent
21abfc2ae4
commit
1cd5564be5
8 changed files with 111 additions and 39 deletions
|
|
@ -26,6 +26,31 @@ std::shared_ptr<nano::block> random_block ()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST (message, header_version)
|
||||||
|
{
|
||||||
|
// Simplest message type
|
||||||
|
nano::keepalive original{ nano::dev::network_params.network };
|
||||||
|
|
||||||
|
// Serialize the original keepalive message
|
||||||
|
std::vector<uint8_t> bytes;
|
||||||
|
{
|
||||||
|
nano::vectorstream stream (bytes);
|
||||||
|
original.serialize (stream);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deserialize the byte stream back to a message header
|
||||||
|
nano::bufferstream stream (bytes.data (), bytes.size ());
|
||||||
|
bool error = false;
|
||||||
|
nano::message_header header (error, stream);
|
||||||
|
ASSERT_FALSE (error);
|
||||||
|
|
||||||
|
// Check header versions
|
||||||
|
ASSERT_EQ (nano::dev::network_params.network.protocol_version_min, header.version_min);
|
||||||
|
ASSERT_EQ (nano::dev::network_params.network.protocol_version, header.version_using);
|
||||||
|
ASSERT_EQ (nano::dev::network_params.network.protocol_version, header.version_max);
|
||||||
|
ASSERT_EQ (nano::message_type::keepalive, header.type);
|
||||||
|
}
|
||||||
|
|
||||||
TEST (message, keepalive_serialization)
|
TEST (message, keepalive_serialization)
|
||||||
{
|
{
|
||||||
nano::keepalive request1{ nano::dev::network_params.network };
|
nano::keepalive request1{ nano::dev::network_params.network };
|
||||||
|
|
@ -62,33 +87,60 @@ TEST (message, keepalive_deserialize)
|
||||||
ASSERT_EQ (message1.peers, message2.peers);
|
ASSERT_EQ (message1.peers, message2.peers);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST (message, publish_serialization)
|
TEST (message, publish)
|
||||||
{
|
{
|
||||||
|
// Create a random block
|
||||||
auto block = random_block ();
|
auto block = random_block ();
|
||||||
nano::publish publish{ nano::dev::network_params.network, block };
|
nano::publish original{ nano::dev::network_params.network, block };
|
||||||
ASSERT_EQ (nano::block_type::send, publish.header.block_type ());
|
ASSERT_FALSE (original.is_originator ());
|
||||||
|
|
||||||
|
// Serialize the original publish message
|
||||||
std::vector<uint8_t> bytes;
|
std::vector<uint8_t> bytes;
|
||||||
{
|
{
|
||||||
nano::vectorstream stream (bytes);
|
nano::vectorstream stream (bytes);
|
||||||
publish.header.serialize (stream);
|
original.serialize (stream);
|
||||||
}
|
}
|
||||||
ASSERT_EQ (8, bytes.size ());
|
|
||||||
ASSERT_EQ (0x52, bytes[0]);
|
// Deserialize the byte stream back to a publish message
|
||||||
ASSERT_EQ (0x41, bytes[1]);
|
|
||||||
ASSERT_EQ (nano::dev::network_params.network.protocol_version, bytes[2]);
|
|
||||||
ASSERT_EQ (nano::dev::network_params.network.protocol_version, bytes[3]);
|
|
||||||
ASSERT_EQ (nano::dev::network_params.network.protocol_version_min, bytes[4]);
|
|
||||||
ASSERT_EQ (static_cast<uint8_t> (nano::message_type::publish), bytes[5]);
|
|
||||||
ASSERT_EQ (0x00, bytes[6]); // extensions
|
|
||||||
ASSERT_EQ (static_cast<uint8_t> (nano::block_type::send), bytes[7]);
|
|
||||||
nano::bufferstream stream (bytes.data (), bytes.size ());
|
nano::bufferstream stream (bytes.data (), bytes.size ());
|
||||||
auto error (false);
|
bool error = false;
|
||||||
nano::message_header header (error, stream);
|
nano::message_header header (error, stream);
|
||||||
ASSERT_FALSE (error);
|
ASSERT_FALSE (error);
|
||||||
ASSERT_EQ (nano::dev::network_params.network.protocol_version_min, header.version_min);
|
nano::publish deserialized (error, stream, header);
|
||||||
ASSERT_EQ (nano::dev::network_params.network.protocol_version, header.version_using);
|
ASSERT_FALSE (error);
|
||||||
ASSERT_EQ (nano::dev::network_params.network.protocol_version, header.version_max);
|
|
||||||
ASSERT_EQ (nano::message_type::publish, header.type);
|
// Assert that the original and deserialized messages are equal
|
||||||
|
ASSERT_EQ (original, deserialized);
|
||||||
|
ASSERT_EQ (*original.block, *deserialized.block);
|
||||||
|
ASSERT_EQ (original.is_originator (), deserialized.is_originator ());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST (message, publish_originator_flag)
|
||||||
|
{
|
||||||
|
// Create a random block
|
||||||
|
auto block = random_block ();
|
||||||
|
nano::publish original{ nano::dev::network_params.network, block, /* originator */ true };
|
||||||
|
ASSERT_TRUE (original.is_originator ());
|
||||||
|
|
||||||
|
// Serialize the original publish message
|
||||||
|
std::vector<uint8_t> bytes;
|
||||||
|
{
|
||||||
|
nano::vectorstream stream (bytes);
|
||||||
|
original.serialize (stream);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deserialize the byte stream back to a publish message
|
||||||
|
nano::bufferstream stream (bytes.data (), bytes.size ());
|
||||||
|
bool error = false;
|
||||||
|
nano::message_header header (error, stream);
|
||||||
|
ASSERT_FALSE (error);
|
||||||
|
nano::publish deserialized (error, stream, header);
|
||||||
|
ASSERT_FALSE (error);
|
||||||
|
|
||||||
|
// Assert that the originator flag is set correctly in both the original and deserialized messages
|
||||||
|
ASSERT_TRUE (deserialized.is_originator ());
|
||||||
|
ASSERT_EQ (original, deserialized);
|
||||||
|
ASSERT_EQ (*original.block, *deserialized.block);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST (message, confirm_header_flags)
|
TEST (message, confirm_header_flags)
|
||||||
|
|
|
||||||
|
|
@ -176,6 +176,7 @@ enum class detail
|
||||||
|
|
||||||
// block source
|
// block source
|
||||||
live,
|
live,
|
||||||
|
live_originator,
|
||||||
bootstrap,
|
bootstrap,
|
||||||
bootstrap_legacy,
|
bootstrap_legacy,
|
||||||
unchecked,
|
unchecked,
|
||||||
|
|
|
||||||
|
|
@ -54,6 +54,7 @@ nano::block_processor::block_processor (nano::node & node_a) :
|
||||||
switch (origin.source)
|
switch (origin.source)
|
||||||
{
|
{
|
||||||
case nano::block_source::live:
|
case nano::block_source::live:
|
||||||
|
case nano::block_source::live_originator:
|
||||||
return config.max_peer_queue;
|
return config.max_peer_queue;
|
||||||
default:
|
default:
|
||||||
return config.max_system_queue;
|
return config.max_system_queue;
|
||||||
|
|
@ -64,6 +65,7 @@ nano::block_processor::block_processor (nano::node & node_a) :
|
||||||
switch (origin.source)
|
switch (origin.source)
|
||||||
{
|
{
|
||||||
case nano::block_source::live:
|
case nano::block_source::live:
|
||||||
|
case nano::block_source::live_originator:
|
||||||
return config.priority_live;
|
return config.priority_live;
|
||||||
case nano::block_source::bootstrap:
|
case nano::block_source::bootstrap:
|
||||||
case nano::block_source::bootstrap_legacy:
|
case nano::block_source::bootstrap_legacy:
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,7 @@ enum class block_source
|
||||||
{
|
{
|
||||||
unknown = 0,
|
unknown = 0,
|
||||||
live,
|
live,
|
||||||
|
live_originator,
|
||||||
bootstrap,
|
bootstrap,
|
||||||
bootstrap_legacy,
|
bootstrap_legacy,
|
||||||
unchecked,
|
unchecked,
|
||||||
|
|
@ -67,10 +68,10 @@ public: // Context
|
||||||
class context
|
class context
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
context (std::shared_ptr<nano::block> block, block_source source);
|
context (std::shared_ptr<nano::block> block, nano::block_source source);
|
||||||
|
|
||||||
std::shared_ptr<nano::block> const block;
|
std::shared_ptr<nano::block> const block;
|
||||||
block_source const source;
|
nano::block_source const source;
|
||||||
std::chrono::steady_clock::time_point const arrival{ std::chrono::steady_clock::now () };
|
std::chrono::steady_clock::time_point const arrival{ std::chrono::steady_clock::now () };
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|
@ -85,16 +86,16 @@ public: // Context
|
||||||
};
|
};
|
||||||
|
|
||||||
public:
|
public:
|
||||||
block_processor (nano::node &);
|
explicit block_processor (nano::node &);
|
||||||
~block_processor ();
|
~block_processor ();
|
||||||
|
|
||||||
void start ();
|
void start ();
|
||||||
void stop ();
|
void stop ();
|
||||||
|
|
||||||
std::size_t size () const;
|
std::size_t size () const;
|
||||||
std::size_t size (block_source) const;
|
std::size_t size (nano::block_source) const;
|
||||||
bool add (std::shared_ptr<nano::block> const &, block_source = block_source::live, std::shared_ptr<nano::transport::channel> const & channel = nullptr);
|
bool add (std::shared_ptr<nano::block> const &, nano::block_source = nano::block_source::live, std::shared_ptr<nano::transport::channel> const & channel = nullptr);
|
||||||
std::optional<nano::block_status> add_blocking (std::shared_ptr<nano::block> const & block, block_source);
|
std::optional<nano::block_status> add_blocking (std::shared_ptr<nano::block> const & block, nano::block_source);
|
||||||
void force (std::shared_ptr<nano::block> const &);
|
void force (std::shared_ptr<nano::block> const &);
|
||||||
bool should_log ();
|
bool should_log ();
|
||||||
|
|
||||||
|
|
@ -127,7 +128,7 @@ private: // Dependencies
|
||||||
nano::node & node;
|
nano::node & node;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
nano::fair_queue<context, block_source> queue;
|
nano::fair_queue<context, nano::block_source> queue;
|
||||||
|
|
||||||
std::chrono::steady_clock::time_point next_log;
|
std::chrono::steady_clock::time_point next_log;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -157,7 +157,6 @@ void nano::message_processor::run_batch (nano::unique_lock<nano::mutex> & lock)
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
// TODO: This was moved, so compare with latest develop before merging to avoid merge bugs
|
|
||||||
class process_visitor : public nano::message_visitor
|
class process_visitor : public nano::message_visitor
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
|
@ -184,7 +183,9 @@ public:
|
||||||
|
|
||||||
void publish (nano::publish const & message) override
|
void publish (nano::publish const & message) override
|
||||||
{
|
{
|
||||||
bool added = node.block_processor.add (message.block, nano::block_source::live, channel);
|
// Put blocks that are being initally broadcasted in a separate queue, so that they won't have to compete with rebroadcasted blocks
|
||||||
|
// Both queues have the same priority and size, so the potential for exploiting this is limited
|
||||||
|
bool added = node.block_processor.add (message.block, message.is_originator () ? nano::block_source::live_originator : nano::block_source::live, channel);
|
||||||
if (!added)
|
if (!added)
|
||||||
{
|
{
|
||||||
node.network.publish_filter.clear (message.digest);
|
node.network.publish_filter.clear (message.digest);
|
||||||
|
|
|
||||||
|
|
@ -433,11 +433,12 @@ nano::publish::publish (bool & error_a, nano::stream & stream_a, nano::message_h
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
nano::publish::publish (nano::network_constants const & constants, std::shared_ptr<nano::block> const & block_a) :
|
nano::publish::publish (nano::network_constants const & constants, std::shared_ptr<nano::block> const & block_a, bool is_originator_a) :
|
||||||
message (constants, nano::message_type::publish),
|
message (constants, nano::message_type::publish),
|
||||||
block (block_a)
|
block (block_a)
|
||||||
{
|
{
|
||||||
header.block_type_set (block->type ());
|
header.block_type_set (block->type ());
|
||||||
|
header.flag_set (originator_flag, is_originator_a);
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::publish::serialize (nano::stream & stream_a) const
|
void nano::publish::serialize (nano::stream & stream_a) const
|
||||||
|
|
@ -465,11 +466,17 @@ bool nano::publish::operator== (nano::publish const & other_a) const
|
||||||
return *block == *other_a.block;
|
return *block == *other_a.block;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool nano::publish::is_originator () const
|
||||||
|
{
|
||||||
|
return header.flag_test (originator_flag);
|
||||||
|
}
|
||||||
|
|
||||||
void nano::publish::operator() (nano::object_stream & obs) const
|
void nano::publish::operator() (nano::object_stream & obs) const
|
||||||
{
|
{
|
||||||
nano::message::operator() (obs); // Write common data
|
nano::message::operator() (obs); // Write common data
|
||||||
|
|
||||||
obs.write ("block", block);
|
obs.write ("block", block);
|
||||||
|
obs.write ("originator", is_originator ());
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
@ -682,6 +689,7 @@ void nano::confirm_ack::operator() (nano::object_stream & obs) const
|
||||||
nano::message::operator() (obs); // Write common data
|
nano::message::operator() (obs); // Write common data
|
||||||
|
|
||||||
obs.write ("vote", vote);
|
obs.write ("vote", vote);
|
||||||
|
obs.write ("rebroadcasted", is_rebroadcasted ());
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
||||||
|
|
@ -183,16 +183,23 @@ public: // Logging
|
||||||
*
|
*
|
||||||
* Header extensions:
|
* Header extensions:
|
||||||
* - [0x0f00] Block type: Identifies the specific type of the block.
|
* - [0x0f00] Block type: Identifies the specific type of the block.
|
||||||
|
* - [0x0004] Originator flag
|
||||||
*/
|
*/
|
||||||
class publish final : public message
|
class publish final : public message
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
publish (bool &, nano::stream &, nano::message_header const &, nano::uint128_t const & = 0, nano::block_uniquer * = nullptr);
|
publish (bool &, nano::stream &, nano::message_header const &, nano::uint128_t const & = 0, nano::block_uniquer * = nullptr);
|
||||||
publish (nano::network_constants const & constants, std::shared_ptr<nano::block> const &);
|
publish (nano::network_constants const & constants, std::shared_ptr<nano::block> const &, bool is_originator = false);
|
||||||
void visit (nano::message_visitor &) const override;
|
|
||||||
void serialize (nano::stream &) const override;
|
void serialize (nano::stream &) const override;
|
||||||
bool deserialize (nano::stream &, nano::block_uniquer * = nullptr);
|
bool deserialize (nano::stream &, nano::block_uniquer * = nullptr);
|
||||||
|
void visit (nano::message_visitor &) const override;
|
||||||
bool operator== (nano::publish const &) const;
|
bool operator== (nano::publish const &) const;
|
||||||
|
|
||||||
|
static uint8_t constexpr originator_flag = 2; // 0x0004
|
||||||
|
bool is_originator () const;
|
||||||
|
|
||||||
|
public: // Payload
|
||||||
std::shared_ptr<nano::block> block;
|
std::shared_ptr<nano::block> block;
|
||||||
nano::uint128_t digest{ 0 };
|
nano::uint128_t digest{ 0 };
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -247,22 +247,22 @@ void nano::network::flood_keepalive_self (float const scale_a)
|
||||||
flood_message (message, nano::transport::buffer_drop_policy::limiter, scale_a);
|
flood_message (message, nano::transport::buffer_drop_policy::limiter, scale_a);
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::network::flood_block (std::shared_ptr<nano::block> const & block_a, nano::transport::buffer_drop_policy const drop_policy_a)
|
void nano::network::flood_block (std::shared_ptr<nano::block> const & block, nano::transport::buffer_drop_policy const drop_policy)
|
||||||
{
|
{
|
||||||
nano::publish message (node.network_params.network, block_a);
|
nano::publish message{ node.network_params.network, block };
|
||||||
flood_message (message, drop_policy_a);
|
flood_message (message, drop_policy);
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::network::flood_block_initial (std::shared_ptr<nano::block> const & block_a)
|
void nano::network::flood_block_initial (std::shared_ptr<nano::block> const & block)
|
||||||
{
|
{
|
||||||
nano::publish message (node.network_params.network, block_a);
|
nano::publish message{ node.network_params.network, block, /* is_originator */ true };
|
||||||
for (auto const & i : node.rep_crawler.principal_representatives ())
|
for (auto const & rep : node.rep_crawler.principal_representatives ())
|
||||||
{
|
{
|
||||||
i.channel->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop);
|
rep.channel->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop);
|
||||||
}
|
}
|
||||||
for (auto & i : list_non_pr (fanout (1.0)))
|
for (auto & peer : list_non_pr (fanout (1.0)))
|
||||||
{
|
{
|
||||||
i->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop);
|
peer->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue