diff --git a/rai/core_test/message_parser.cpp b/rai/core_test/message_parser.cpp index ce6cdb45..6fdd1f69 100644 --- a/rai/core_test/message_parser.cpp +++ b/rai/core_test/message_parser.cpp @@ -12,6 +12,7 @@ public: confirm_req_count (0), confirm_ack_count (0), bulk_pull_count (0), + bulk_pull_blocks_count (0), bulk_push_count (0), frontier_req_count (0) { @@ -36,6 +37,10 @@ public: { ++bulk_pull_count; } + void bulk_pull_blocks (rai::bulk_pull_blocks const &) + { + ++bulk_pull_blocks_count; + } void bulk_push (rai::bulk_push const &) { ++bulk_push_count; @@ -49,6 +54,7 @@ public: uint64_t confirm_req_count; uint64_t confirm_ack_count; uint64_t bulk_pull_count; + uint64_t bulk_pull_blocks_count; uint64_t bulk_push_count; uint64_t frontier_req_count; }; diff --git a/rai/node/bootstrap.cpp b/rai/node/bootstrap.cpp index 6358b4c9..51bfb8c4 100644 --- a/rai/node/bootstrap.cpp +++ b/rai/node/bootstrap.cpp @@ -1267,6 +1267,14 @@ void rai::bootstrap_server::receive_header_action (boost::system::error_code con }); break; } + case rai::message_type::bulk_pull_blocks: + { + auto this_l (shared_from_this ()); + boost::asio::async_read (*socket, boost::asio::buffer (receive_buffer.data () + rai::bootstrap_message_header_size, sizeof (rai::uint256_union) + sizeof (rai::uint256_union) + sizeof (bulk_pull_blocks_mode) + sizeof (uint32_t)), [this_l](boost::system::error_code const & ec, size_t size_a) { + this_l->receive_bulk_pull_blocks_action (ec, size_a); + }); + break; + } case rai::message_type::frontier_req: { auto this_l (shared_from_this ()); @@ -1319,6 +1327,25 @@ void rai::bootstrap_server::receive_bulk_pull_action (boost::system::error_code } } +void rai::bootstrap_server::receive_bulk_pull_blocks_action (boost::system::error_code const & ec, size_t size_a) +{ + if (!ec) + { + std::unique_ptr request (new rai::bulk_pull_blocks); + rai::bufferstream stream (receive_buffer.data (), 8 + sizeof (rai::uint256_union) + sizeof (rai::uint256_union) + sizeof (bulk_pull_blocks_mode) + sizeof (uint32_t)); + auto error (request->deserialize (stream)); + if (!error) + { + if (node->config.logging.bulk_pull_logging ()) + { + BOOST_LOG (node->log) << boost::str (boost::format ("Received bulk pull blocks for %1% to %2%") % request->min_hash.to_string () % request->max_hash.to_string ()); + } + add_request (std::unique_ptr (request.release ())); + receive (); + } + } +} + void rai::bootstrap_server::receive_frontier_req_action (boost::system::error_code const & ec, size_t size_a) { if (!ec) @@ -1399,6 +1426,11 @@ public: auto response (std::make_shared (connection, std::unique_ptr (static_cast (connection->requests.front ().release ())))); response->send_next (); } + void bulk_pull_blocks (rai::bulk_pull_blocks const &) override + { + auto response (std::make_shared (connection, std::unique_ptr (static_cast (connection->requests.front ().release ())))); + response->send_next (); + } void bulk_push (rai::bulk_push const &) override { auto response (std::make_shared (connection)); @@ -1420,6 +1452,11 @@ void rai::bootstrap_server::run_next () requests.front ()->visit (visitor); } +/** + * Handle a request for the pull of all blocks associated with an account + * The account is supplied as the "start" member, and the final block to + * send is the "end" member + */ void rai::bulk_pull_server::set_current_end () { assert (request != nullptr); @@ -1561,6 +1598,190 @@ request (std::move (request_a)) set_current_end (); } +/** + * Bulk pull of a range of blocks, or a checksum for a range of + * blocks [min_hash, max_hash) up to a max of max_count. mode + * specifies whether the list is returned or a single checksum + * of all the hashes. The checksum is computed by XORing the + * hash of all the blocks that would be returned + */ +void rai::bulk_pull_blocks_server::set_params () +{ + assert (request != nullptr); + + if (connection->node->config.logging.bulk_pull_logging ()) + { + std::string modeName = ""; + + switch (request->mode) + { + case rai::bulk_pull_blocks_mode::list_blocks: + modeName = "list"; + break; + case rai::bulk_pull_blocks_mode::checksum_blocks: + modeName = "checksum"; + break; + } + + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Bulk pull of block range starting, min (%1%) to max (%2%), max_count = %3%, mode = %4%") % request->min_hash.to_string () % request->max_hash.to_string () % request->max_count % modeName); + } + + stream = connection->node->store.block_info_begin (stream_transaction, request->min_hash); + + if (request->max_hash < request->min_hash) + { + if (connection->node->config.logging.bulk_pull_logging ()) + { + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Bulk pull of block range is invalid, min (%1%) is greater than max (%2%)") % request->min_hash.to_string () % request->max_hash.to_string ()); + } + + request->max_hash = request->min_hash; + } +} + +void rai::bulk_pull_blocks_server::send_next () +{ + std::unique_ptr block (get_next ()); + if (block != nullptr) + { + if (connection->node->config.logging.bulk_pull_logging ()) + { + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Sending block: %1%") % block->hash ().to_string ()); + } + + send_buffer.clear (); + auto this_l (shared_from_this ()); + + if (request->mode == rai::bulk_pull_blocks_mode::list_blocks) + { + rai::vectorstream stream (send_buffer); + rai::serialize_block (stream, *block); + } + else if (request->mode == rai::bulk_pull_blocks_mode::checksum_blocks) + { + checksum ^= block->hash (); + } + + async_write (*connection->socket, boost::asio::buffer (send_buffer.data (), send_buffer.size ()), [this_l](boost::system::error_code const & ec, size_t size_a) { + this_l->sent_action (ec, size_a); + }); + } + else + { + if (connection->node->config.logging.bulk_pull_logging ()) + { + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Done sending blocks")); + } + + if (request->mode == rai::bulk_pull_blocks_mode::checksum_blocks) + { + { + send_buffer.clear (); + rai::vectorstream stream (send_buffer); + write (stream, static_cast (rai::block_type::not_a_block)); + write (stream, checksum); + } + + auto this_l (shared_from_this ()); + if (connection->node->config.logging.bulk_pull_logging ()) + { + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Sending checksum: %1%") % checksum.to_string ()); + } + + async_write (*connection->socket, boost::asio::buffer (send_buffer.data (), send_buffer.size ()), [this_l](boost::system::error_code const & ec, size_t size_a) { + this_l->send_finished (); + }); + } + else + { + send_finished (); + } + } +} + +std::unique_ptr rai::bulk_pull_blocks_server::get_next () +{ + std::unique_ptr result; + bool out_of_bounds; + + out_of_bounds = false; + if (request->max_count != 0) + { + if (sent_count >= request->max_count) + { + out_of_bounds = true; + } + + sent_count++; + } + + if (!out_of_bounds) + { + if (stream->first.size () != 0) + { + auto current = stream->first.uint256 (); + if (current < request->max_hash) + { + rai::transaction transaction (connection->node->store.environment, nullptr, false); + result = connection->node->store.block_get (transaction, current); + + ++stream; + } + } + } + return result; +} + +void rai::bulk_pull_blocks_server::sent_action (boost::system::error_code const & ec, size_t size_a) +{ + if (!ec) + { + send_next (); + } + else + { + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Unable to bulk send block: %1%") % ec.message ()); + } +} + +void rai::bulk_pull_blocks_server::send_finished () +{ + send_buffer.clear (); + send_buffer.push_back (static_cast (rai::block_type::not_a_block)); + auto this_l (shared_from_this ()); + if (connection->node->config.logging.bulk_pull_logging ()) + { + BOOST_LOG (connection->node->log) << "Bulk sending finished"; + } + async_write (*connection->socket, boost::asio::buffer (send_buffer.data (), 1), [this_l](boost::system::error_code const & ec, size_t size_a) { + this_l->no_block_sent (ec, size_a); + }); +} + +void rai::bulk_pull_blocks_server::no_block_sent (boost::system::error_code const & ec, size_t size_a) +{ + if (!ec) + { + assert (size_a == 1); + connection->finish_request (); + } + else + { + BOOST_LOG (connection->node->log) << "Unable to send not-a-block"; + } +} + +rai::bulk_pull_blocks_server::bulk_pull_blocks_server (std::shared_ptr const & connection_a, std::unique_ptr request_a) : +connection (connection_a), +request (std::move (request_a)), +stream (nullptr), +stream_transaction (connection_a->node->store.environment, nullptr, false), +sent_count (0), +checksum (0) +{ + set_params (); +} + rai::bulk_push_server::bulk_push_server (std::shared_ptr const & connection_a) : connection (connection_a) { diff --git a/rai/node/bootstrap.hpp b/rai/node/bootstrap.hpp index 56baa0c1..ee311a68 100644 --- a/rai/node/bootstrap.hpp +++ b/rai/node/bootstrap.hpp @@ -21,6 +21,13 @@ enum class sync_result error, fork }; + +/** + * The length of every message header, parsed by rai::message::read_header () + * The 2 here represents the size of a std::bitset<16>, which is 2 chars long normally + */ +static const int bootstrap_message_header_size = sizeof (rai::message::magic_number) + sizeof (uint8_t) + sizeof (uint8_t) + sizeof (uint8_t) + sizeof (rai::message_type) + 2; + class block_synchronization { public: @@ -206,6 +213,7 @@ public: void receive (); void receive_header_action (boost::system::error_code const &, size_t); void receive_bulk_pull_action (boost::system::error_code const &, size_t); + void receive_bulk_pull_blocks_action (boost::system::error_code const &, size_t); void receive_frontier_req_action (boost::system::error_code const &, size_t); void receive_bulk_push_action (); void add_request (std::unique_ptr); @@ -233,6 +241,25 @@ public: std::vector send_buffer; rai::block_hash current; }; +class bulk_pull_blocks; +class bulk_pull_blocks_server : public std::enable_shared_from_this +{ +public: + bulk_pull_blocks_server (std::shared_ptr const &, std::unique_ptr); + void set_params (); + std::unique_ptr get_next (); + void send_next (); + void sent_action (boost::system::error_code const &, size_t); + void send_finished (); + void no_block_sent (boost::system::error_code const &, size_t); + std::shared_ptr connection; + std::unique_ptr request; + std::vector send_buffer; + rai::store_iterator stream; + rai::transaction stream_transaction; + uint32_t sent_count; + rai::block_hash checksum; +}; class bulk_push_server : public std::enable_shared_from_this { public: diff --git a/rai/node/common.cpp b/rai/node/common.cpp index e20a6824..7ba651e8 100644 --- a/rai/node/common.cpp +++ b/rai/node/common.cpp @@ -496,6 +496,52 @@ void rai::bulk_pull::serialize (rai::stream & stream_a) write (stream_a, end); } +rai::bulk_pull_blocks::bulk_pull_blocks () : +message (rai::message_type::bulk_pull_blocks) +{ +} + +void rai::bulk_pull_blocks::visit (rai::message_visitor & visitor_a) const +{ + visitor_a.bulk_pull_blocks (*this); +} + +bool rai::bulk_pull_blocks::deserialize (rai::stream & stream_a) +{ + auto result (read_header (stream_a, version_max, version_using, version_min, type, extensions)); + assert (!result); + assert (rai::message_type::bulk_pull_blocks == type); + if (!result) + { + assert (type == rai::message_type::bulk_pull_blocks); + result = read (stream_a, min_hash); + if (!result) + { + result = read (stream_a, max_hash); + } + + if (!result) + { + result = read (stream_a, mode); + } + + if (!result) + { + result = read (stream_a, max_count); + } + } + return result; +} + +void rai::bulk_pull_blocks::serialize (rai::stream & stream_a) +{ + write_header (stream_a); + write (stream_a, min_hash); + write (stream_a, max_hash); + write (stream_a, mode); + write (stream_a, max_count); +} + rai::bulk_push::bulk_push () : message (rai::message_type::bulk_push) { diff --git a/rai/node/common.hpp b/rai/node/common.hpp index 3e3bb9da..4d92fe6b 100644 --- a/rai/node/common.hpp +++ b/rai/node/common.hpp @@ -92,7 +92,13 @@ enum class message_type : uint8_t confirm_ack, bulk_pull, bulk_push, - frontier_req + frontier_req, + bulk_pull_blocks +}; +enum class bulk_pull_blocks_mode : uint8_t +{ + list_blocks, + checksum_blocks }; class message_visitor; class message @@ -201,6 +207,18 @@ public: rai::uint256_union start; rai::block_hash end; }; +class bulk_pull_blocks : public message +{ +public: + bulk_pull_blocks (); + bool deserialize (rai::stream &) override; + void serialize (rai::stream &) override; + void visit (rai::message_visitor &) const override; + rai::block_hash min_hash; + rai::block_hash max_hash; + bulk_pull_blocks_mode mode; + uint32_t max_count; +}; class bulk_push : public message { public: @@ -217,6 +235,7 @@ public: virtual void confirm_req (rai::confirm_req const &) = 0; virtual void confirm_ack (rai::confirm_ack const &) = 0; virtual void bulk_pull (rai::bulk_pull const &) = 0; + virtual void bulk_pull_blocks (rai::bulk_pull_blocks const &) = 0; virtual void bulk_push (rai::bulk_push const &) = 0; virtual void frontier_req (rai::frontier_req const &) = 0; virtual ~message_visitor (); diff --git a/rai/node/node.cpp b/rai/node/node.cpp index 767e050b..276356ee 100644 --- a/rai/node/node.cpp +++ b/rai/node/node.cpp @@ -405,6 +405,10 @@ public: { assert (false); } + void bulk_pull_blocks (rai::bulk_pull_blocks const &) override + { + assert (false); + } void bulk_push (rai::bulk_push const &) override { assert (false);