Add a new message type "bulk_pull_blocks"

* Added start of support for "bulk_pull_blocks" command

* Added support for sending a limited number of hashes

* Added a checksum computng mode where the hash from each selected block is XORed together to give a simple checksum

* Add stub function for bulk_pull_blocks into the testing harness

* Updated comments to be more Doxygen-friendly

* Fixed formatting issues

* Added a constant for the size of a message header

* Fixed issue with size of std::bitset not being calculated correctly

* Changed rai::bootstrap_message_header_size to a translation-unit local variable
This commit is contained in:
Roy Keene 2018-01-20 14:44:05 -06:00 committed by androm3da
commit 1424334957
6 changed files with 324 additions and 1 deletions

View file

@ -12,6 +12,7 @@ public:
confirm_req_count (0),
confirm_ack_count (0),
bulk_pull_count (0),
bulk_pull_blocks_count (0),
bulk_push_count (0),
frontier_req_count (0)
{
@ -36,6 +37,10 @@ public:
{
++bulk_pull_count;
}
void bulk_pull_blocks (rai::bulk_pull_blocks const &)
{
++bulk_pull_blocks_count;
}
void bulk_push (rai::bulk_push const &)
{
++bulk_push_count;
@ -49,6 +54,7 @@ public:
uint64_t confirm_req_count;
uint64_t confirm_ack_count;
uint64_t bulk_pull_count;
uint64_t bulk_pull_blocks_count;
uint64_t bulk_push_count;
uint64_t frontier_req_count;
};

View file

@ -1267,6 +1267,14 @@ void rai::bootstrap_server::receive_header_action (boost::system::error_code con
});
break;
}
case rai::message_type::bulk_pull_blocks:
{
auto this_l (shared_from_this ());
boost::asio::async_read (*socket, boost::asio::buffer (receive_buffer.data () + rai::bootstrap_message_header_size, sizeof (rai::uint256_union) + sizeof (rai::uint256_union) + sizeof (bulk_pull_blocks_mode) + sizeof (uint32_t)), [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->receive_bulk_pull_blocks_action (ec, size_a);
});
break;
}
case rai::message_type::frontier_req:
{
auto this_l (shared_from_this ());
@ -1319,6 +1327,25 @@ void rai::bootstrap_server::receive_bulk_pull_action (boost::system::error_code
}
}
void rai::bootstrap_server::receive_bulk_pull_blocks_action (boost::system::error_code const & ec, size_t size_a)
{
if (!ec)
{
std::unique_ptr<rai::bulk_pull_blocks> request (new rai::bulk_pull_blocks);
rai::bufferstream stream (receive_buffer.data (), 8 + sizeof (rai::uint256_union) + sizeof (rai::uint256_union) + sizeof (bulk_pull_blocks_mode) + sizeof (uint32_t));
auto error (request->deserialize (stream));
if (!error)
{
if (node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (node->log) << boost::str (boost::format ("Received bulk pull blocks for %1% to %2%") % request->min_hash.to_string () % request->max_hash.to_string ());
}
add_request (std::unique_ptr<rai::message> (request.release ()));
receive ();
}
}
}
void rai::bootstrap_server::receive_frontier_req_action (boost::system::error_code const & ec, size_t size_a)
{
if (!ec)
@ -1399,6 +1426,11 @@ public:
auto response (std::make_shared<rai::bulk_pull_server> (connection, std::unique_ptr<rai::bulk_pull> (static_cast<rai::bulk_pull *> (connection->requests.front ().release ()))));
response->send_next ();
}
void bulk_pull_blocks (rai::bulk_pull_blocks const &) override
{
auto response (std::make_shared<rai::bulk_pull_blocks_server> (connection, std::unique_ptr<rai::bulk_pull_blocks> (static_cast<rai::bulk_pull_blocks *> (connection->requests.front ().release ()))));
response->send_next ();
}
void bulk_push (rai::bulk_push const &) override
{
auto response (std::make_shared<rai::bulk_push_server> (connection));
@ -1420,6 +1452,11 @@ void rai::bootstrap_server::run_next ()
requests.front ()->visit (visitor);
}
/**
* Handle a request for the pull of all blocks associated with an account
* The account is supplied as the "start" member, and the final block to
* send is the "end" member
*/
void rai::bulk_pull_server::set_current_end ()
{
assert (request != nullptr);
@ -1561,6 +1598,190 @@ request (std::move (request_a))
set_current_end ();
}
/**
* Bulk pull of a range of blocks, or a checksum for a range of
* blocks [min_hash, max_hash) up to a max of max_count. mode
* specifies whether the list is returned or a single checksum
* of all the hashes. The checksum is computed by XORing the
* hash of all the blocks that would be returned
*/
void rai::bulk_pull_blocks_server::set_params ()
{
assert (request != nullptr);
if (connection->node->config.logging.bulk_pull_logging ())
{
std::string modeName = "<unknown>";
switch (request->mode)
{
case rai::bulk_pull_blocks_mode::list_blocks:
modeName = "list";
break;
case rai::bulk_pull_blocks_mode::checksum_blocks:
modeName = "checksum";
break;
}
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Bulk pull of block range starting, min (%1%) to max (%2%), max_count = %3%, mode = %4%") % request->min_hash.to_string () % request->max_hash.to_string () % request->max_count % modeName);
}
stream = connection->node->store.block_info_begin (stream_transaction, request->min_hash);
if (request->max_hash < request->min_hash)
{
if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Bulk pull of block range is invalid, min (%1%) is greater than max (%2%)") % request->min_hash.to_string () % request->max_hash.to_string ());
}
request->max_hash = request->min_hash;
}
}
void rai::bulk_pull_blocks_server::send_next ()
{
std::unique_ptr<rai::block> block (get_next ());
if (block != nullptr)
{
if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Sending block: %1%") % block->hash ().to_string ());
}
send_buffer.clear ();
auto this_l (shared_from_this ());
if (request->mode == rai::bulk_pull_blocks_mode::list_blocks)
{
rai::vectorstream stream (send_buffer);
rai::serialize_block (stream, *block);
}
else if (request->mode == rai::bulk_pull_blocks_mode::checksum_blocks)
{
checksum ^= block->hash ();
}
async_write (*connection->socket, boost::asio::buffer (send_buffer.data (), send_buffer.size ()), [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->sent_action (ec, size_a);
});
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Done sending blocks"));
}
if (request->mode == rai::bulk_pull_blocks_mode::checksum_blocks)
{
{
send_buffer.clear ();
rai::vectorstream stream (send_buffer);
write (stream, static_cast<uint8_t> (rai::block_type::not_a_block));
write (stream, checksum);
}
auto this_l (shared_from_this ());
if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Sending checksum: %1%") % checksum.to_string ());
}
async_write (*connection->socket, boost::asio::buffer (send_buffer.data (), send_buffer.size ()), [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->send_finished ();
});
}
else
{
send_finished ();
}
}
}
std::unique_ptr<rai::block> rai::bulk_pull_blocks_server::get_next ()
{
std::unique_ptr<rai::block> result;
bool out_of_bounds;
out_of_bounds = false;
if (request->max_count != 0)
{
if (sent_count >= request->max_count)
{
out_of_bounds = true;
}
sent_count++;
}
if (!out_of_bounds)
{
if (stream->first.size () != 0)
{
auto current = stream->first.uint256 ();
if (current < request->max_hash)
{
rai::transaction transaction (connection->node->store.environment, nullptr, false);
result = connection->node->store.block_get (transaction, current);
++stream;
}
}
}
return result;
}
void rai::bulk_pull_blocks_server::sent_action (boost::system::error_code const & ec, size_t size_a)
{
if (!ec)
{
send_next ();
}
else
{
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Unable to bulk send block: %1%") % ec.message ());
}
}
void rai::bulk_pull_blocks_server::send_finished ()
{
send_buffer.clear ();
send_buffer.push_back (static_cast<uint8_t> (rai::block_type::not_a_block));
auto this_l (shared_from_this ());
if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection->node->log) << "Bulk sending finished";
}
async_write (*connection->socket, boost::asio::buffer (send_buffer.data (), 1), [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->no_block_sent (ec, size_a);
});
}
void rai::bulk_pull_blocks_server::no_block_sent (boost::system::error_code const & ec, size_t size_a)
{
if (!ec)
{
assert (size_a == 1);
connection->finish_request ();
}
else
{
BOOST_LOG (connection->node->log) << "Unable to send not-a-block";
}
}
rai::bulk_pull_blocks_server::bulk_pull_blocks_server (std::shared_ptr<rai::bootstrap_server> const & connection_a, std::unique_ptr<rai::bulk_pull_blocks> request_a) :
connection (connection_a),
request (std::move (request_a)),
stream (nullptr),
stream_transaction (connection_a->node->store.environment, nullptr, false),
sent_count (0),
checksum (0)
{
set_params ();
}
rai::bulk_push_server::bulk_push_server (std::shared_ptr<rai::bootstrap_server> const & connection_a) :
connection (connection_a)
{

View file

@ -21,6 +21,13 @@ enum class sync_result
error,
fork
};
/**
* The length of every message header, parsed by rai::message::read_header ()
* The 2 here represents the size of a std::bitset<16>, which is 2 chars long normally
*/
static const int bootstrap_message_header_size = sizeof (rai::message::magic_number) + sizeof (uint8_t) + sizeof (uint8_t) + sizeof (uint8_t) + sizeof (rai::message_type) + 2;
class block_synchronization
{
public:
@ -206,6 +213,7 @@ public:
void receive ();
void receive_header_action (boost::system::error_code const &, size_t);
void receive_bulk_pull_action (boost::system::error_code const &, size_t);
void receive_bulk_pull_blocks_action (boost::system::error_code const &, size_t);
void receive_frontier_req_action (boost::system::error_code const &, size_t);
void receive_bulk_push_action ();
void add_request (std::unique_ptr<rai::message>);
@ -233,6 +241,25 @@ public:
std::vector<uint8_t> send_buffer;
rai::block_hash current;
};
class bulk_pull_blocks;
class bulk_pull_blocks_server : public std::enable_shared_from_this<rai::bulk_pull_blocks_server>
{
public:
bulk_pull_blocks_server (std::shared_ptr<rai::bootstrap_server> const &, std::unique_ptr<rai::bulk_pull_blocks>);
void set_params ();
std::unique_ptr<rai::block> get_next ();
void send_next ();
void sent_action (boost::system::error_code const &, size_t);
void send_finished ();
void no_block_sent (boost::system::error_code const &, size_t);
std::shared_ptr<rai::bootstrap_server> connection;
std::unique_ptr<rai::bulk_pull_blocks> request;
std::vector<uint8_t> send_buffer;
rai::store_iterator stream;
rai::transaction stream_transaction;
uint32_t sent_count;
rai::block_hash checksum;
};
class bulk_push_server : public std::enable_shared_from_this<rai::bulk_push_server>
{
public:

View file

@ -496,6 +496,52 @@ void rai::bulk_pull::serialize (rai::stream & stream_a)
write (stream_a, end);
}
rai::bulk_pull_blocks::bulk_pull_blocks () :
message (rai::message_type::bulk_pull_blocks)
{
}
void rai::bulk_pull_blocks::visit (rai::message_visitor & visitor_a) const
{
visitor_a.bulk_pull_blocks (*this);
}
bool rai::bulk_pull_blocks::deserialize (rai::stream & stream_a)
{
auto result (read_header (stream_a, version_max, version_using, version_min, type, extensions));
assert (!result);
assert (rai::message_type::bulk_pull_blocks == type);
if (!result)
{
assert (type == rai::message_type::bulk_pull_blocks);
result = read (stream_a, min_hash);
if (!result)
{
result = read (stream_a, max_hash);
}
if (!result)
{
result = read (stream_a, mode);
}
if (!result)
{
result = read (stream_a, max_count);
}
}
return result;
}
void rai::bulk_pull_blocks::serialize (rai::stream & stream_a)
{
write_header (stream_a);
write (stream_a, min_hash);
write (stream_a, max_hash);
write (stream_a, mode);
write (stream_a, max_count);
}
rai::bulk_push::bulk_push () :
message (rai::message_type::bulk_push)
{

View file

@ -92,7 +92,13 @@ enum class message_type : uint8_t
confirm_ack,
bulk_pull,
bulk_push,
frontier_req
frontier_req,
bulk_pull_blocks
};
enum class bulk_pull_blocks_mode : uint8_t
{
list_blocks,
checksum_blocks
};
class message_visitor;
class message
@ -201,6 +207,18 @@ public:
rai::uint256_union start;
rai::block_hash end;
};
class bulk_pull_blocks : public message
{
public:
bulk_pull_blocks ();
bool deserialize (rai::stream &) override;
void serialize (rai::stream &) override;
void visit (rai::message_visitor &) const override;
rai::block_hash min_hash;
rai::block_hash max_hash;
bulk_pull_blocks_mode mode;
uint32_t max_count;
};
class bulk_push : public message
{
public:
@ -217,6 +235,7 @@ public:
virtual void confirm_req (rai::confirm_req const &) = 0;
virtual void confirm_ack (rai::confirm_ack const &) = 0;
virtual void bulk_pull (rai::bulk_pull const &) = 0;
virtual void bulk_pull_blocks (rai::bulk_pull_blocks const &) = 0;
virtual void bulk_push (rai::bulk_push const &) = 0;
virtual void frontier_req (rai::frontier_req const &) = 0;
virtual ~message_visitor ();

View file

@ -405,6 +405,10 @@ public:
{
assert (false);
}
void bulk_pull_blocks (rai::bulk_pull_blocks const &) override
{
assert (false);
}
void bulk_push (rai::bulk_push const &) override
{
assert (false);