From 559c3352a470b9bf977e63b748739dec1db23def Mon Sep 17 00:00:00 2001 From: Roy Keene Date: Thu, 9 Aug 2018 21:57:18 -0500 Subject: [PATCH] Bulk pull account (#1039) Added "bulk_pull_account" bootstrapping request --- rai/core_test/message_parser.cpp | 6 + rai/node/bootstrap.cpp | 337 +++++++++++++++++++++++++++++++ rai/node/bootstrap.hpp | 21 ++ rai/node/common.cpp | 42 ++++ rai/node/common.hpp | 21 +- rai/node/node.cpp | 4 + rai/node/stats.cpp | 3 + rai/node/stats.hpp | 1 + 8 files changed, 434 insertions(+), 1 deletion(-) diff --git a/rai/core_test/message_parser.cpp b/rai/core_test/message_parser.cpp index 1e15b8e2..e5ae0db6 100644 --- a/rai/core_test/message_parser.cpp +++ b/rai/core_test/message_parser.cpp @@ -12,6 +12,7 @@ public: confirm_req_count (0), confirm_ack_count (0), bulk_pull_count (0), + bulk_pull_account_count (0), bulk_pull_blocks_count (0), bulk_push_count (0), frontier_req_count (0) @@ -37,6 +38,10 @@ public: { ++bulk_pull_count; } + void bulk_pull_account (rai::bulk_pull_account const &) override + { + ++bulk_pull_account_count; + } void bulk_pull_blocks (rai::bulk_pull_blocks const &) override { ++bulk_pull_blocks_count; @@ -58,6 +63,7 @@ public: uint64_t confirm_req_count; uint64_t confirm_ack_count; uint64_t bulk_pull_count; + uint64_t bulk_pull_account_count; uint64_t bulk_pull_blocks_count; uint64_t bulk_push_count; uint64_t frontier_req_count; diff --git a/rai/node/bootstrap.cpp b/rai/node/bootstrap.cpp index c8f96f42..2dec41a2 100644 --- a/rai/node/bootstrap.cpp +++ b/rai/node/bootstrap.cpp @@ -1336,6 +1336,15 @@ void rai::bootstrap_server::receive_header_action (boost::system::error_code con }); break; } + case rai::message_type::bulk_pull_account: + { + node->stats.inc (rai::stat::type::bootstrap, rai::stat::detail::bulk_pull_account, rai::stat::dir::in); + auto this_l (shared_from_this ()); + socket->async_read (receive_buffer, sizeof (rai::uint256_union) + sizeof (rai::uint128_union) + sizeof (uint8_t), [this_l, header](boost::system::error_code const & ec, size_t size_a) { + this_l->receive_bulk_pull_account_action (ec, size_a, header); + }); + break; + } case rai::message_type::bulk_pull_blocks: { node->stats.inc (rai::stat::type::bootstrap, rai::stat::detail::bulk_pull_blocks, rai::stat::dir::in); @@ -1399,6 +1408,26 @@ void rai::bootstrap_server::receive_bulk_pull_action (boost::system::error_code } } +void rai::bootstrap_server::receive_bulk_pull_account_action (boost::system::error_code const & ec, size_t size_a, rai::message_header const & header_a) +{ + if (!ec) + { + auto error (false); + assert (size_a == (sizeof (rai::uint256_union) + sizeof (rai::uint128_union) + sizeof (uint8_t))); + rai::bufferstream stream (receive_buffer->data (), size_a); + std::unique_ptr request (new rai::bulk_pull_account (error, stream, header_a)); + if (!error) + { + if (node->config.logging.bulk_pull_logging ()) + { + BOOST_LOG (node->log) << boost::str (boost::format ("Received bulk pull account for %1% with a minimum amount of %2%") % request->account.to_account () % rai::amount (request->minimum_amount).format_balance (rai::Mxrb_ratio, 10, true)); + } + add_request (std::unique_ptr (request.release ())); + receive (); + } + } +} + void rai::bootstrap_server::receive_bulk_pull_blocks_action (boost::system::error_code const & ec, size_t size_a, rai::message_header const & header_a) { if (!ec) @@ -1496,6 +1525,11 @@ public: auto response (std::make_shared (connection, std::unique_ptr (static_cast (connection->requests.front ().release ())))); response->send_next (); } + void bulk_pull_account (rai::bulk_pull_account const &) override + { + auto response (std::make_shared (connection, std::unique_ptr (static_cast (connection->requests.front ().release ())))); + response->send_frontier (); + } void bulk_pull_blocks (rai::bulk_pull_blocks const &) override { auto response (std::make_shared (connection, std::unique_ptr (static_cast (connection->requests.front ().release ())))); @@ -1734,6 +1768,309 @@ send_buffer (std::make_shared> ()) set_current_end (); } +/** + * Bulk pull blocks related to an account + */ +void rai::bulk_pull_account_server::set_params () +{ + assert (request != nullptr); + + /* + * Parse the flags + */ + invalid_request = false; + if (request->flags == rai::bulk_pull_account_flags::pending_address_only) + { + pending_address_only = true; + } + else if (request->flags == rai::bulk_pull_account_flags::pending_hash_and_amount) + { + pending_address_only = false; + } + else + { + if (connection->node->config.logging.bulk_pull_logging ()) + { + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Invalid bulk_pull_account flags supplied %1%") % static_cast (request->flags)); + } + + invalid_request = true; + + return; + } + + /* + * Initialize the current item from the requested account + */ + current_key.account = request->account; + current_key.hash = 0; +} + +void rai::bulk_pull_account_server::send_frontier () +{ + /* + * This function is really the entry point into this class, + * so handle the invalid_request case by terminating the + * request without any response + */ + if (invalid_request) + { + connection->finish_request (); + + return; + } + + /* + * Supply the account frontier + */ + /** + ** Establish a database transaction + **/ + rai::transaction stream_transaction (connection->node->store.environment, nullptr, false); + + /** + ** Get account balance and frontier block hash + **/ + auto account_frontier_hash (connection->node->ledger.latest (stream_transaction, request->account)); + auto account_frontier_balance_int (connection->node->ledger.account_balance (stream_transaction, request->account)); + rai::uint128_union account_frontier_balance (account_frontier_balance_int); + + /** + ** Write the frontier block hash and balance into a buffer + **/ + send_buffer->clear (); + { + rai::vectorstream output_stream (*send_buffer); + + write (output_stream, account_frontier_hash.bytes); + write (output_stream, account_frontier_balance.bytes); + } + + /** + ** Send the buffer to the requestor + **/ + auto this_l (shared_from_this ()); + connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) { + this_l->sent_action (ec, size_a); + }); +} + +void rai::bulk_pull_account_server::send_next_block () +{ + /* + * Get the next item from the queue, it is a tuple with the key (which + * contains the account and hash) and data (which contains the amount) + */ + auto block_data (get_next ()); + auto block_info_key (block_data.first.get ()); + auto block_info (block_data.second.get ()); + + if (block_info_key != nullptr) + { + /* + * If we have a new item, emit it to the socket + */ + send_buffer->clear (); + + if (pending_address_only) + { + rai::vectorstream output_stream (*send_buffer); + + if (connection->node->config.logging.bulk_pull_logging ()) + { + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Sending address: %1%") % block_info->source.to_string ()); + } + + write (output_stream, block_info->source.bytes); + } + else + { + rai::vectorstream output_stream (*send_buffer); + + if (connection->node->config.logging.bulk_pull_logging ()) + { + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Sending block: %1%") % block_info_key->hash.to_string ()); + } + + write (output_stream, block_info_key->hash.bytes); + write (output_stream, block_info->amount.bytes); + } + + auto this_l (shared_from_this ()); + connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) { + this_l->sent_action (ec, size_a); + }); + } + else + { + /* + * Otherwise, finalize the connection + */ + if (connection->node->config.logging.bulk_pull_logging ()) + { + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Done sending blocks")); + } + + send_finished (); + } +} + +std::pair, std::unique_ptr> rai::bulk_pull_account_server::get_next () +{ + std::pair, std::unique_ptr> result; + + while (true) + { + /* + * For each iteration of this loop, establish and then + * destroy a database transaction, to avoid locking the + * database for a prolonged period. + */ + rai::transaction stream_transaction (connection->node->store.environment, nullptr, false); + auto stream (connection->node->store.pending_begin (stream_transaction, current_key)); + + if (stream->first == nullptr) + { + break; + } + + rai::pending_key key (stream->first); + rai::pending_info info (stream->second); + + /* + * Get the key for the next value, to use in the next call or iteration + */ + current_key.account = key.account; + current_key.hash = key.hash.number () + 1; + + /* + * Finish up if the response is for a different account + */ + if (key.account != request->account) + { + break; + } + + /* + * Skip entries where the amount is less than the requested + * minimum + */ + if (info.amount < request->minimum_amount) + { + continue; + } + + /* + * If the pending_address_only flag is set, de-duplicate the + * responses. The responses are the address of the sender, + * so they are are part of the pending table's information + * and not key, so we have to de-duplicate them manually. + */ + if (pending_address_only) + { + if (deduplication.count (info.source) != 0) + { + continue; + } + + deduplication.insert ({ info.source, true }); + } + + result.first = std::unique_ptr (new rai::pending_key (key)); + result.second = std::unique_ptr (new rai::pending_info (info)); + + break; + } + + return result; +} + +void rai::bulk_pull_account_server::sent_action (boost::system::error_code const & ec, size_t size_a) +{ + if (!ec) + { + send_next_block (); + } + else + { + if (connection->node->config.logging.bulk_pull_logging ()) + { + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Unable to bulk send block: %1%") % ec.message ()); + } + } +} + +void rai::bulk_pull_account_server::send_finished () +{ + /* + * The "bulk_pull_account" final sequence is a final block of all + * zeros. If we are sending only account public keys (with the + * "pending_address_only" flag) then it will be 256-bits of zeros, + * otherwise it will be 384-bits of zeros. + */ + send_buffer->clear (); + + { + rai::vectorstream output_stream (*send_buffer); + rai::uint256_union account_zero (0); + rai::uint128_union balance_zero (0); + + write (output_stream, account_zero.bytes); + + if (!pending_address_only) + { + write (output_stream, balance_zero.bytes); + } + } + + auto this_l (shared_from_this ()); + + if (connection->node->config.logging.bulk_pull_logging ()) + { + BOOST_LOG (connection->node->log) << "Bulk sending for an account finished"; + } + + connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) { + this_l->complete (ec, size_a); + }); +} + +void rai::bulk_pull_account_server::complete (boost::system::error_code const & ec, size_t size_a) +{ + if (!ec) + { + if (pending_address_only) + { + assert (size_a == 32); + } + else + { + assert (size_a == 48); + } + + connection->finish_request (); + } + else + { + if (connection->node->config.logging.bulk_pull_logging ()) + { + BOOST_LOG (connection->node->log) << "Unable to pending-as-zero"; + } + } +} + +rai::bulk_pull_account_server::bulk_pull_account_server (std::shared_ptr const & connection_a, std::unique_ptr request_a) : +connection (connection_a), +request (std::move (request_a)), +send_buffer (std::make_shared> ()), +current_key (0, 0) +{ + /* + * Setup the streaming response for the first call to "send_frontier" and "send_next_block" + */ + set_params (); +} + /** * Bulk pull of a range of blocks, or a checksum for a range of * blocks [min_hash, max_hash) up to a max of max_count. mode diff --git a/rai/node/bootstrap.hpp b/rai/node/bootstrap.hpp index 1dbc6aa0..e8667a7d 100644 --- a/rai/node/bootstrap.hpp +++ b/rai/node/bootstrap.hpp @@ -218,6 +218,7 @@ public: void receive (); void receive_header_action (boost::system::error_code const &, size_t); void receive_bulk_pull_action (boost::system::error_code const &, size_t, rai::message_header const &); + void receive_bulk_pull_account_action (boost::system::error_code const &, size_t, rai::message_header const &); void receive_bulk_pull_blocks_action (boost::system::error_code const &, size_t, rai::message_header const &); void receive_frontier_req_action (boost::system::error_code const &, size_t, rai::message_header const &); void receive_bulk_push_action (); @@ -247,6 +248,26 @@ public: rai::block_hash current; bool include_start; }; +class bulk_pull_account; +class bulk_pull_account_server : public std::enable_shared_from_this +{ +public: + bulk_pull_account_server (std::shared_ptr const &, std::unique_ptr); + void set_params (); + std::pair, std::unique_ptr> get_next (); + void send_frontier (); + void send_next_block (); + void sent_action (boost::system::error_code const &, size_t); + void send_finished (); + void complete (boost::system::error_code const &, size_t); + std::shared_ptr connection; + std::unique_ptr request; + std::shared_ptr> send_buffer; + std::unordered_map deduplication; + rai::pending_key current_key; + bool pending_address_only; + bool invalid_request; +}; class bulk_pull_blocks; class bulk_pull_blocks_server : public std::enable_shared_from_this { diff --git a/rai/node/common.cpp b/rai/node/common.cpp index bb99ee4a..459b3aa1 100644 --- a/rai/node/common.cpp +++ b/rai/node/common.cpp @@ -509,6 +509,48 @@ void rai::bulk_pull::serialize (rai::stream & stream_a) write (stream_a, end); } +rai::bulk_pull_account::bulk_pull_account () : +message (rai::message_type::bulk_pull_account) +{ +} + +rai::bulk_pull_account::bulk_pull_account (bool & error_a, rai::stream & stream_a, rai::message_header const & header_a) : +message (header_a) +{ + if (!error_a) + { + error_a = deserialize (stream_a); + } +} + +void rai::bulk_pull_account::visit (rai::message_visitor & visitor_a) const +{ + visitor_a.bulk_pull_account (*this); +} + +bool rai::bulk_pull_account::deserialize (rai::stream & stream_a) +{ + assert (header.type == rai::message_type::bulk_pull_account); + auto result (read (stream_a, account)); + if (!result) + { + result = read (stream_a, minimum_amount); + if (!result) + { + result = read (stream_a, flags); + } + } + return result; +} + +void rai::bulk_pull_account::serialize (rai::stream & stream_a) +{ + header.serialize (stream_a); + write (stream_a, account); + write (stream_a, minimum_amount); + write (stream_a, flags); +} + rai::bulk_pull_blocks::bulk_pull_blocks () : message (rai::message_type::bulk_pull_blocks) { diff --git a/rai/node/common.hpp b/rai/node/common.hpp index 7df053ce..fb2bc150 100644 --- a/rai/node/common.hpp +++ b/rai/node/common.hpp @@ -140,13 +140,19 @@ enum class message_type : uint8_t bulk_push = 0x7, frontier_req = 0x8, bulk_pull_blocks = 0x9, - node_id_handshake = 0x0a + node_id_handshake = 0x0a, + bulk_pull_account = 0x0b }; enum class bulk_pull_blocks_mode : uint8_t { list_blocks, checksum_blocks }; +enum class bulk_pull_account_flags : uint8_t +{ + pending_hash_and_amount = 0x0, + pending_address_only = 0x1 +}; class message_visitor; class message_header { @@ -277,6 +283,18 @@ public: rai::uint256_union start; rai::block_hash end; }; +class bulk_pull_account : public message +{ +public: + bulk_pull_account (); + bulk_pull_account (bool &, rai::stream &, rai::message_header const &); + bool deserialize (rai::stream &) override; + void serialize (rai::stream &) override; + void visit (rai::message_visitor &) const override; + rai::uint256_union account; + rai::uint128_union minimum_amount; + bulk_pull_account_flags flags; +}; class bulk_pull_blocks : public message { public: @@ -321,6 +339,7 @@ public: virtual void confirm_req (rai::confirm_req const &) = 0; virtual void confirm_ack (rai::confirm_ack const &) = 0; virtual void bulk_pull (rai::bulk_pull const &) = 0; + virtual void bulk_pull_account (rai::bulk_pull_account const &) = 0; virtual void bulk_pull_blocks (rai::bulk_pull_blocks const &) = 0; virtual void bulk_push (rai::bulk_push const &) = 0; virtual void frontier_req (rai::frontier_req const &) = 0; diff --git a/rai/node/node.cpp b/rai/node/node.cpp index 6991ae80..d219d2de 100644 --- a/rai/node/node.cpp +++ b/rai/node/node.cpp @@ -422,6 +422,10 @@ public: { assert (false); } + void bulk_pull_account (rai::bulk_pull_account const &) override + { + assert (false); + } void bulk_pull_blocks (rai::bulk_pull_blocks const &) override { assert (false); diff --git a/rai/node/stats.cpp b/rai/node/stats.cpp index f4ce63ca..f1d18665 100644 --- a/rai/node/stats.cpp +++ b/rai/node/stats.cpp @@ -362,6 +362,9 @@ std::string rai::stat::detail_to_string (uint32_t key) case rai::stat::detail::bulk_pull: res = "bulk_pull"; break; + case rai::stat::detail::bulk_pull_account: + res = "bulk_pull_account"; + break; case rai::stat::detail::bulk_pull_blocks: res = "bulk_pull_blocks"; break; diff --git a/rai/node/stats.hpp b/rai/node/stats.hpp index 15b589d1..929b4131 100644 --- a/rai/node/stats.hpp +++ b/rai/node/stats.hpp @@ -216,6 +216,7 @@ public: initiate, bulk_pull, bulk_push, + bulk_pull_account, bulk_pull_blocks, frontier_req,