confirm_req_hash to reduce bandwidth usage (#1046)

* confirm_req_hash to reduce bandwidth usage

* Fixes

* Update tests

* rai::network::send_confirm_req_hash

* Formatting

* Confirm only hash for non-forked blocks

* Merge confirm by hash to confirm_req

* Typo

* Typo

* Fix issues

* Use send_confirm_req_hash in beta & test networks

* Remove unsused variable

* Batch confirmation request

* Logical errors

* Returning correct files permissions

* Simplify request by removing hash-only option

Usually root required for forks

* Fix issues

* confirm_req count for root-hash pairs

* Previous for send block cannot be 0

* Send vote-by-hash if successor = requested block

* restore file permissions

* Namespace renaming

* Networks remaning

* Fix republish_block

* Update test message_parser.exact_confirm_req_hash_size

* Formatting

* Add bitset for previous blocks in roots

* Use bool[32] for previous blocks in roots

* Test block.confirm_req_hash_bacth_serialization

* Typo

* Using auto & for vectors iteration

* Fix

* Use range-based for loop

* Use std::vector<uint8_t> bytes instead of shared pointer vector

* Fix message_parser.exact_confirm_req_hash_size test

* Use nano::block in test instead of pointers

* Use nano::block in test instead of pointers

* Use std::vector<uint8_t> bytes instead of shared pointer vector

* Better vectors initialization

* Correct assert for roots_hashes.size ()

* Make max hashes in confirm_req packet constant

Max count can be increased with TCP packets to 32

* Increase protocol version to 16

As indicator of support confirm_req by hash

* Remove unused variable

* Avoid using double root in confirm_req

* Recent changes fix
This commit is contained in:
Sergey Kroshnin 2019-01-28 22:07:18 +03:00 committed by GitHub
commit 3d81f1609e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 384 additions and 17 deletions

View file

@ -332,6 +332,58 @@ TEST (block, confirm_req_serialization)
ASSERT_EQ (*req.block, *req2.block);
}
TEST (block, confirm_req_hash_serialization)
{
nano::keypair key1;
nano::keypair key2;
nano::send_block block (1, key2.pub, 200, nano::keypair ().prv, 2, 3);
nano::confirm_req req (block.hash (), block.root ());
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
req.serialize (stream);
}
auto error (false);
nano::bufferstream stream2 (bytes.data (), bytes.size ());
nano::message_header header (error, stream2);
nano::confirm_req req2 (error, stream2, header);
ASSERT_FALSE (error);
ASSERT_EQ (req, req2);
ASSERT_EQ (req.roots_hashes, req2.roots_hashes);
}
TEST (block, confirm_req_hash_batch_serialization)
{
nano::keypair key;
nano::keypair representative;
std::vector<std::pair<nano::block_hash, nano::block_hash>> roots_hashes;
nano::state_block open (key.pub, 0, representative.pub, 2, 4, key.prv, key.pub, 5);
roots_hashes.push_back (std::make_pair (open.hash (), open.root ()));
for (auto i (roots_hashes.size ()); i < 7; i++)
{
nano::keypair key1;
nano::keypair previous;
nano::state_block block (key1.pub, previous.pub, representative.pub, 2, 4, key1.prv, key1.pub, 5);
roots_hashes.push_back (std::make_pair (block.hash (), block.root ()));
}
roots_hashes.push_back (std::make_pair (open.hash (), open.root ()));
nano::confirm_req req (roots_hashes);
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
req.serialize (stream);
}
auto error (false);
nano::bufferstream stream2 (bytes.data (), bytes.size ());
nano::message_header header (error, stream2);
nano::confirm_req req2 (error, stream2, header);
ASSERT_FALSE (error);
ASSERT_EQ (req, req2);
ASSERT_EQ (req.roots_hashes, req2.roots_hashes);
ASSERT_EQ (req.roots_hashes, roots_hashes);
ASSERT_EQ (req2.roots_hashes, roots_hashes);
}
TEST (state_block, serialization)
{
nano::keypair key1;

View file

@ -130,6 +130,38 @@ TEST (message_parser, exact_confirm_req_size)
ASSERT_NE (parser.status, nano::message_parser::parse_status::success);
}
TEST (message_parser, exact_confirm_req_hash_size)
{
nano::system system (24000, 1);
test_visitor visitor;
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::send_block block (1, 1, 2, nano::keypair ().prv, 4, system.work.generate (1));
nano::confirm_req message (block.hash (), block.root ());
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
message.serialize (stream);
}
ASSERT_EQ (0, visitor.confirm_req_count);
ASSERT_EQ (parser.status, nano::message_parser::parse_status::success);
auto error (false);
nano::bufferstream stream1 (bytes.data (), bytes.size ());
nano::message_header header1 (error, stream1);
ASSERT_FALSE (error);
parser.deserialize_confirm_req (stream1, header1);
ASSERT_EQ (1, visitor.confirm_req_count);
ASSERT_EQ (parser.status, nano::message_parser::parse_status::success);
bytes.push_back (0);
nano::bufferstream stream2 (bytes.data (), bytes.size ());
nano::message_header header2 (error, stream2);
ASSERT_FALSE (error);
parser.deserialize_confirm_req (stream2, header2);
ASSERT_EQ (1, visitor.confirm_req_count);
ASSERT_NE (parser.status, nano::message_parser::parse_status::success);
}
TEST (message_parser, exact_publish_size)
{
nano::system system (24000, 1);

View file

@ -302,7 +302,7 @@ void nano::message_parser::deserialize_confirm_req (nano::stream & stream_a, nan
nano::confirm_req incoming (error, stream_a, header_a, &block_uniquer);
if (!error && at_end (stream_a))
{
if (!nano::work_validate (*incoming.block))
if (incoming.block == nullptr || !nano::work_validate (*incoming.block))
{
visitor.confirm_req (incoming);
}
@ -484,12 +484,55 @@ block (block_a)
{
header.block_type_set (block->type ());
}
nano::confirm_req::confirm_req (std::vector<std::pair<nano::block_hash, nano::block_hash>> const & roots_hashes_a) :
message (nano::message_type::confirm_req),
roots_hashes (roots_hashes_a)
{
// not_a_block (1) block type for hashes + roots request
header.block_type_set (nano::block_type::not_a_block);
}
nano::confirm_req::confirm_req (nano::block_hash const & hash_a, nano::block_hash const & root_a) :
message (nano::message_type::confirm_req),
roots_hashes (std::vector<std::pair<nano::block_hash, nano::block_hash>> (1, std::make_pair (hash_a, root_a)))
{
assert (!roots_hashes.empty ());
// not_a_block (1) block type for hashes + roots request
header.block_type_set (nano::block_type::not_a_block);
}
bool nano::confirm_req::deserialize (nano::stream & stream_a, nano::block_uniquer * uniquer_a)
{
bool result (true);
assert (header.type == nano::message_type::confirm_req);
block = nano::deserialize_block (stream_a, header.block_type (), uniquer_a);
auto result (block == nullptr);
if (header.block_type () == nano::block_type::not_a_block)
{
uint8_t count (0);
result = read (stream_a, count);
for (auto i (0); i != count && !result; ++i)
{
nano::block_hash block_hash (0);
nano::block_hash root (0);
result = read (stream_a, block_hash);
if (!result && !block_hash.is_zero ())
{
result = read (stream_a, root);
if (!result && !root.is_zero ())
{
roots_hashes.push_back (std::make_pair (block_hash, root));
}
}
}
if (!result)
{
result = roots_hashes.empty () || (roots_hashes.size () != count);
}
}
else
{
block = nano::deserialize_block (stream_a, header.block_type (), uniquer_a);
result = block == nullptr;
}
return result;
}
@ -500,14 +543,53 @@ void nano::confirm_req::visit (nano::message_visitor & visitor_a) const
void nano::confirm_req::serialize (nano::stream & stream_a) const
{
assert (block != nullptr);
header.serialize (stream_a);
block->serialize (stream_a);
if (header.block_type () == nano::block_type::not_a_block)
{
assert (!roots_hashes.empty ());
// Calculate size
assert (roots_hashes.size () <= 32);
uint8_t count (roots_hashes.size ());
write (stream_a, count);
// Write hashes & roots
for (auto & root_hash : roots_hashes)
{
write (stream_a, root_hash.first);
write (stream_a, root_hash.second);
}
}
else
{
assert (block != nullptr);
block->serialize (stream_a);
}
}
bool nano::confirm_req::operator== (nano::confirm_req const & other_a) const
{
return *block == *other_a.block;
bool equal (false);
if (block != nullptr && other_a.block != nullptr)
{
equal = *block == *other_a.block;
}
else if (!roots_hashes.empty () && !other_a.roots_hashes.empty ())
{
equal = roots_hashes == other_a.roots_hashes;
}
return equal;
}
std::string nano::confirm_req::roots_string () const
{
std::string result;
for (auto & root_hash : roots_hashes)
{
result += root_hash.first.to_string ();
result += ":";
result += root_hash.second.to_string ();
result += ", ";
}
return result;
}
nano::confirm_ack::confirm_ack (bool & error_a, nano::stream & stream_a, nano::message_header const & header_a, nano::vote_uniquer * uniquer_a) :

View file

@ -297,11 +297,15 @@ class confirm_req : public message
public:
confirm_req (bool &, nano::stream &, nano::message_header const &, nano::block_uniquer * = nullptr);
confirm_req (std::shared_ptr<nano::block>);
confirm_req (std::vector<std::pair<nano::block_hash, nano::block_hash>> const &);
confirm_req (nano::block_hash const &, nano::block_hash const &);
bool deserialize (nano::stream &, nano::block_uniquer * = nullptr);
void serialize (nano::stream &) const override;
void visit (nano::message_visitor &) const override;
bool operator== (nano::confirm_req const &) const;
std::shared_ptr<nano::block> block;
std::vector<std::pair<nano::block_hash, nano::block_hash>> roots_hashes;
std::string roots_string () const;
};
class confirm_ack : public message
{

View file

@ -235,7 +235,7 @@ void nano::network::republish (nano::block_hash const & hash_a, std::shared_ptr<
BOOST_LOG (node.log) << boost::str (boost::format ("Publishing %1% to %2%") % hash_a.to_string () % endpoint_a);
}
std::weak_ptr<nano::node> node_w (node.shared ());
send_buffer (buffer_a->data (), buffer_a->size (), endpoint_a, [buffer_a, node_w, endpoint_a](boost::system::error_code const & ec, size_t size) {
send_buffer (buffer_a->data (), buffer_a->size (), endpoint_a, [node_w, endpoint_a](boost::system::error_code const & ec, size_t size) {
if (auto node_l = node_w.lock ())
{
if (ec && node_l->config.logging.network_logging ())
@ -289,6 +289,23 @@ bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a
return result;
}
void nano::network::confirm_hashes (nano::transaction const & transaction_a, nano::endpoint const & peer_a, std::vector<nano::block_hash> blocks_bundle_a)
{
if (node.config.enable_voting)
{
node.wallets.foreach_representative (transaction_a, [this, &blocks_bundle_a, &peer_a, &transaction_a](nano::public_key const & pub_a, nano::raw_key const & prv_a) {
auto vote (this->node.store.vote_generate (transaction_a, pub_a, prv_a, blocks_bundle_a));
nano::confirm_ack confirm (vote);
std::shared_ptr<std::vector<uint8_t>> bytes (new std::vector<uint8_t>);
{
nano::vectorstream stream (*bytes);
confirm.serialize (stream);
}
this->node.network.confirm_send (confirm, bytes, peer_a);
});
}
}
void nano::network::republish_block (std::shared_ptr<nano::block> block)
{
auto hash (block->hash ());
@ -305,6 +322,22 @@ void nano::network::republish_block (std::shared_ptr<nano::block> block)
}
}
void nano::network::republish_block (std::shared_ptr<nano::block> block, nano::endpoint const & peer_a)
{
auto hash (block->hash ());
nano::publish message (block);
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
message.serialize (stream);
}
republish (hash, std::make_shared<std::vector<uint8_t>> (bytes), peer_a);
if (node.config.logging.network_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Block %1% was republished to peer") % hash.to_string ());
}
}
void nano::network::republish_block_batch (std::deque<std::shared_ptr<nano::block>> blocks_a, unsigned delay_a)
{
auto block (blocks_a.front ());
@ -394,6 +427,43 @@ void nano::network::broadcast_confirm_req_base (std::shared_ptr<nano::block> blo
}
}
void nano::network::broadcast_confirm_req_batch (std::unordered_map<nano::endpoint, std::vector<std::pair<nano::block_hash, nano::block_hash>>> request_bundle_a, unsigned delay_a, bool resumption)
{
const size_t max_reps = 10;
if (!resumption && node.config.logging.network_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Broadcasting batch confirm req to %1% representatives") % request_bundle_a.size ());
}
auto count (0);
while (!request_bundle_a.empty () && count < max_reps)
{
auto j (request_bundle_a.begin ());
count++;
std::vector<std::pair<nano::block_hash, nano::block_hash>> roots_hashes;
// Limit max request size hash + root to 6 pairs
while (roots_hashes.size () <= confirm_req_hashes_max && !j->second.empty ())
{
roots_hashes.push_back (j->second.back ());
j->second.pop_back ();
}
send_confirm_req_hashes (j->first, roots_hashes);
if (j->second.empty ())
{
request_bundle_a.erase (j);
}
}
if (!request_bundle_a.empty ())
{
std::weak_ptr<nano::node> node_w (node.shared ());
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a), [node_w, request_bundle_a, delay_a]() {
if (auto node_l = node_w.lock ())
{
node_l->network.broadcast_confirm_req_batch (request_bundle_a, delay_a + 50, true);
}
});
}
}
void nano::network::broadcast_confirm_req_batch (std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<nano::peer_information>>>> deque_a, unsigned delay_a)
{
auto pair (deque_a.front ());
@ -440,6 +510,31 @@ void nano::network::send_confirm_req (nano::endpoint const & endpoint_a, std::sh
});
}
void nano::network::send_confirm_req_hashes (nano::endpoint const & endpoint_a, std::vector<std::pair<nano::block_hash, nano::block_hash>> const & roots_hashes_a)
{
nano::confirm_req message (roots_hashes_a);
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
message.serialize (stream);
}
if (node.config.logging.network_message_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Sending confirm req hashes to %1%") % endpoint_a);
}
std::weak_ptr<nano::node> node_w (node.shared ());
node.stats.inc (nano::stat::type::message, nano::stat::detail::confirm_req, nano::stat::dir::out);
send_buffer (bytes.data (), bytes.size (), endpoint_a, [node_w](boost::system::error_code const & ec, size_t size) {
if (auto node_l = node_w.lock ())
{
if (ec && node_l->config.logging.network_logging ())
{
BOOST_LOG (node_l->log) << boost::str (boost::format ("Error sending confirm request: %1%") % ec.message ());
}
}
});
}
template <typename T>
void rep_query (nano::node & node_a, T const & peers_a)
{
@ -515,7 +610,14 @@ public:
{
if (node.config.logging.network_message_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Confirm_req message from %1% for %2%") % sender % message_a.block->hash ().to_string ());
if (!message_a.roots_hashes.empty ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Confirm_req message from %1% for hashes:roots %2%") % sender % message_a.roots_string ());
}
else
{
BOOST_LOG (node.log) << boost::str (boost::format ("Confirm_req message from %1% for %2%") % sender % message_a.block->hash ().to_string ());
}
}
node.stats.inc (nano::stat::type::message, nano::stat::detail::confirm_req, nano::stat::dir::in);
node.peers.contacted (sender, message_a.header.version_using);
@ -523,11 +625,50 @@ public:
if (node.config.enable_voting)
{
auto transaction (node.store.tx_begin_read ());
auto successor (node.ledger.successor (transaction, nano::uint512_union (message_a.block->previous (), message_a.block->root ())));
if (successor != nullptr)
if (message_a.block != nullptr)
{
auto same_block (successor->hash () == message_a.block->hash ());
confirm_block (transaction, node, sender, std::move (successor), !same_block);
auto successor (node.ledger.successor (transaction, nano::uint512_union (message_a.block->previous (), message_a.block->root ())));
if (successor != nullptr)
{
auto same_block (successor->hash () == message_a.block->hash ());
confirm_block (transaction, node, sender, std::move (successor), !same_block);
}
}
else if (!message_a.roots_hashes.empty ())
{
std::vector<nano::block_hash> blocks_bundle;
for (auto & root_hash : message_a.roots_hashes)
{
if (node.store.block_exists (transaction, root_hash.first))
{
blocks_bundle.push_back (root_hash.first);
}
else
{
nano::block_hash successor (0);
// Search for block root
successor = node.store.block_successor (transaction, root_hash.second);
// Search for account root
if (successor.is_zero () && node.store.account_exists (transaction, root_hash.second))
{
nano::account_info info;
auto error (node.store.account_get (transaction, root_hash.second, info));
assert (!error);
successor = info.open_block;
}
if (!successor.is_zero ())
{
blocks_bundle.push_back (successor);
auto successor_block (node.store.block_get (transaction, successor));
assert (successor_block != nullptr);
node.network.republish_block (std::move (successor_block), sender);
}
}
}
if (!blocks_bundle.empty ())
{
node.network.confirm_hashes (transaction, sender, blocks_bundle);
}
}
}
}
@ -3305,6 +3446,7 @@ void nano::active_transactions::request_confirm (std::unique_lock<std::mutex> &
auto transaction (node.store.tx_begin_read ());
unsigned unconfirmed_count (0);
unsigned unconfirmed_announcements (0);
std::unordered_map<nano::endpoint, std::vector<std::pair<nano::block_hash, nano::block_hash>>> requests_bundle;
std::deque<std::shared_ptr<nano::block>> rebroadcast_bundle;
std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<nano::peer_information>>>> confirm_req_bundle;
@ -3425,15 +3567,60 @@ void nano::active_transactions::request_confirm (std::unique_lock<std::mutex> &
}
if ((!reps->empty () && total_weight > node.config.online_weight_minimum.number ()) || roots_size > 5)
{
if (confirm_req_bundle.size () < max_broadcast_queue)
// broadcast_confirm_req_base modifies reps, so we clone it once to avoid aliasing
if (nano::nano_network != nano::nano_networks::nano_test_network)
{
confirm_req_bundle.push_back (std::make_pair (i->election->status.winner, reps));
if (confirm_req_bundle.size () < max_broadcast_queue)
{
confirm_req_bundle.push_back (std::make_pair (i->election->status.winner, reps));
}
}
else
{
for (auto & rep : *reps)
{
auto rep_request (requests_bundle.find (rep.endpoint));
auto block (i->election->status.winner);
auto root_hash (std::make_pair (block->hash (), block->root ()));
if (rep_request == requests_bundle.end ())
{
if (requests_bundle.size () < max_broadcast_queue)
{
std::vector<std::pair<nano::block_hash, nano::block_hash>> insert_vector = { root_hash };
requests_bundle.insert (std::make_pair (rep.endpoint, insert_vector));
}
}
else if (rep_request->second.size () < max_broadcast_queue * nano::network::confirm_req_hashes_max)
{
rep_request->second.push_back (root_hash);
}
}
}
}
else
{
// broadcast request to all peers
confirm_req_bundle.push_back (std::make_pair (i->election->status.winner, std::make_shared<std::vector<nano::peer_information>> (node.peers.list_vector (100))));
if (nano::nano_network != nano::nano_networks::nano_test_network)
{
confirm_req_bundle.push_back (std::make_pair (i->election->status.winner, std::make_shared<std::vector<nano::peer_information>> (node.peers.list_vector (100))));
}
else
{
for (auto & rep : *reps)
{
auto rep_request (requests_bundle.find (rep.endpoint));
auto block (i->election->status.winner);
auto root_hash (std::make_pair (block->hash (), block->root ()));
if (rep_request == requests_bundle.end ())
{
std::vector<std::pair<nano::block_hash, nano::block_hash>> insert_vector = { root_hash };
requests_bundle.insert (std::make_pair (rep.endpoint, insert_vector));
}
else
{
rep_request->second.push_back (root_hash);
}
}
}
}
}
}
@ -3445,6 +3632,11 @@ void nano::active_transactions::request_confirm (std::unique_lock<std::mutex> &
{
node.network.republish_block_batch (rebroadcast_bundle);
}
// Batch confirmation request
if (nano::nano_network != nano::nano_networks::nano_live_network && !requests_bundle.empty ())
{
node.network.broadcast_confirm_req_batch (requests_bundle, 50);
}
//confirm_req broadcast
if (!confirm_req_bundle.empty ())
{

View file

@ -300,6 +300,7 @@ public:
void rpc_action (boost::system::error_code const &, size_t);
void republish_vote (std::shared_ptr<nano::vote>);
void republish_block (std::shared_ptr<nano::block>);
void republish_block (std::shared_ptr<nano::block>, nano::endpoint const &);
static unsigned const broadcast_interval_ms = 10;
void republish_block_batch (std::deque<std::shared_ptr<nano::block>>, unsigned = broadcast_interval_ms);
void republish (nano::block_hash const &, std::shared_ptr<std::vector<uint8_t>>, nano::endpoint);
@ -309,8 +310,11 @@ public:
void send_node_id_handshake (nano::endpoint const &, boost::optional<nano::uint256_union> const & query, boost::optional<nano::uint256_union> const & respond_to);
void broadcast_confirm_req (std::shared_ptr<nano::block>);
void broadcast_confirm_req_base (std::shared_ptr<nano::block>, std::shared_ptr<std::vector<nano::peer_information>>, unsigned, bool = false);
void broadcast_confirm_req_batch (std::unordered_map<nano::endpoint, std::vector<std::pair<nano::block_hash, nano::block_hash>>>, unsigned = broadcast_interval_ms, bool = false);
void broadcast_confirm_req_batch (std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<nano::peer_information>>>>, unsigned = broadcast_interval_ms);
void send_confirm_req (nano::endpoint const &, std::shared_ptr<nano::block>);
void send_confirm_req_hashes (nano::endpoint const &, std::vector<std::pair<nano::block_hash, nano::block_hash>> const &);
void confirm_hashes (nano::transaction const &, nano::endpoint const &, std::vector<nano::block_hash>);
void send_buffer (uint8_t const *, size_t, nano::endpoint const &, std::function<void(boost::system::error_code const &, size_t)>);
nano::endpoint endpoint ();
nano::udp_buffer buffer_container;
@ -322,6 +326,7 @@ public:
bool on;
static uint16_t const node_port = nano::nano_network == nano::nano_networks::nano_live_network ? 7075 : 54000;
static size_t const buffer_size = 512;
static size_t const confirm_req_hashes_max = 6;
};
class node_init

View file

@ -35,7 +35,7 @@ struct hash<::nano::uint512_union>
}
namespace nano
{
const uint8_t protocol_version = 0x0f;
const uint8_t protocol_version = 0x10;
const uint8_t protocol_version_min = 0x0d;
const uint8_t node_id_version = 0x0c;