Bulk pull account (#1039)

Added "bulk_pull_account" bootstrapping request
This commit is contained in:
Roy Keene 2018-08-09 21:57:18 -05:00 committed by GitHub
commit 559c3352a4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 434 additions and 1 deletions

View file

@ -12,6 +12,7 @@ public:
confirm_req_count (0),
confirm_ack_count (0),
bulk_pull_count (0),
bulk_pull_account_count (0),
bulk_pull_blocks_count (0),
bulk_push_count (0),
frontier_req_count (0)
@ -37,6 +38,10 @@ public:
{
++bulk_pull_count;
}
void bulk_pull_account (rai::bulk_pull_account const &) override
{
++bulk_pull_account_count;
}
void bulk_pull_blocks (rai::bulk_pull_blocks const &) override
{
++bulk_pull_blocks_count;
@ -58,6 +63,7 @@ public:
uint64_t confirm_req_count;
uint64_t confirm_ack_count;
uint64_t bulk_pull_count;
uint64_t bulk_pull_account_count;
uint64_t bulk_pull_blocks_count;
uint64_t bulk_push_count;
uint64_t frontier_req_count;

View file

@ -1336,6 +1336,15 @@ void rai::bootstrap_server::receive_header_action (boost::system::error_code con
});
break;
}
case rai::message_type::bulk_pull_account:
{
node->stats.inc (rai::stat::type::bootstrap, rai::stat::detail::bulk_pull_account, rai::stat::dir::in);
auto this_l (shared_from_this ());
socket->async_read (receive_buffer, sizeof (rai::uint256_union) + sizeof (rai::uint128_union) + sizeof (uint8_t), [this_l, header](boost::system::error_code const & ec, size_t size_a) {
this_l->receive_bulk_pull_account_action (ec, size_a, header);
});
break;
}
case rai::message_type::bulk_pull_blocks:
{
node->stats.inc (rai::stat::type::bootstrap, rai::stat::detail::bulk_pull_blocks, rai::stat::dir::in);
@ -1399,6 +1408,26 @@ void rai::bootstrap_server::receive_bulk_pull_action (boost::system::error_code
}
}
void rai::bootstrap_server::receive_bulk_pull_account_action (boost::system::error_code const & ec, size_t size_a, rai::message_header const & header_a)
{
if (!ec)
{
auto error (false);
assert (size_a == (sizeof (rai::uint256_union) + sizeof (rai::uint128_union) + sizeof (uint8_t)));
rai::bufferstream stream (receive_buffer->data (), size_a);
std::unique_ptr<rai::bulk_pull_account> request (new rai::bulk_pull_account (error, stream, header_a));
if (!error)
{
if (node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (node->log) << boost::str (boost::format ("Received bulk pull account for %1% with a minimum amount of %2%") % request->account.to_account () % rai::amount (request->minimum_amount).format_balance (rai::Mxrb_ratio, 10, true));
}
add_request (std::unique_ptr<rai::message> (request.release ()));
receive ();
}
}
}
void rai::bootstrap_server::receive_bulk_pull_blocks_action (boost::system::error_code const & ec, size_t size_a, rai::message_header const & header_a)
{
if (!ec)
@ -1496,6 +1525,11 @@ public:
auto response (std::make_shared<rai::bulk_pull_server> (connection, std::unique_ptr<rai::bulk_pull> (static_cast<rai::bulk_pull *> (connection->requests.front ().release ()))));
response->send_next ();
}
void bulk_pull_account (rai::bulk_pull_account const &) override
{
auto response (std::make_shared<rai::bulk_pull_account_server> (connection, std::unique_ptr<rai::bulk_pull_account> (static_cast<rai::bulk_pull_account *> (connection->requests.front ().release ()))));
response->send_frontier ();
}
void bulk_pull_blocks (rai::bulk_pull_blocks const &) override
{
auto response (std::make_shared<rai::bulk_pull_blocks_server> (connection, std::unique_ptr<rai::bulk_pull_blocks> (static_cast<rai::bulk_pull_blocks *> (connection->requests.front ().release ()))));
@ -1734,6 +1768,309 @@ send_buffer (std::make_shared<std::vector<uint8_t>> ())
set_current_end ();
}
/**
* Bulk pull blocks related to an account
*/
void rai::bulk_pull_account_server::set_params ()
{
assert (request != nullptr);
/*
* Parse the flags
*/
invalid_request = false;
if (request->flags == rai::bulk_pull_account_flags::pending_address_only)
{
pending_address_only = true;
}
else if (request->flags == rai::bulk_pull_account_flags::pending_hash_and_amount)
{
pending_address_only = false;
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Invalid bulk_pull_account flags supplied %1%") % static_cast<uint8_t> (request->flags));
}
invalid_request = true;
return;
}
/*
* Initialize the current item from the requested account
*/
current_key.account = request->account;
current_key.hash = 0;
}
void rai::bulk_pull_account_server::send_frontier ()
{
/*
* This function is really the entry point into this class,
* so handle the invalid_request case by terminating the
* request without any response
*/
if (invalid_request)
{
connection->finish_request ();
return;
}
/*
* Supply the account frontier
*/
/**
** Establish a database transaction
**/
rai::transaction stream_transaction (connection->node->store.environment, nullptr, false);
/**
** Get account balance and frontier block hash
**/
auto account_frontier_hash (connection->node->ledger.latest (stream_transaction, request->account));
auto account_frontier_balance_int (connection->node->ledger.account_balance (stream_transaction, request->account));
rai::uint128_union account_frontier_balance (account_frontier_balance_int);
/**
** Write the frontier block hash and balance into a buffer
**/
send_buffer->clear ();
{
rai::vectorstream output_stream (*send_buffer);
write (output_stream, account_frontier_hash.bytes);
write (output_stream, account_frontier_balance.bytes);
}
/**
** Send the buffer to the requestor
**/
auto this_l (shared_from_this ());
connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->sent_action (ec, size_a);
});
}
void rai::bulk_pull_account_server::send_next_block ()
{
/*
* Get the next item from the queue, it is a tuple with the key (which
* contains the account and hash) and data (which contains the amount)
*/
auto block_data (get_next ());
auto block_info_key (block_data.first.get ());
auto block_info (block_data.second.get ());
if (block_info_key != nullptr)
{
/*
* If we have a new item, emit it to the socket
*/
send_buffer->clear ();
if (pending_address_only)
{
rai::vectorstream output_stream (*send_buffer);
if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Sending address: %1%") % block_info->source.to_string ());
}
write (output_stream, block_info->source.bytes);
}
else
{
rai::vectorstream output_stream (*send_buffer);
if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Sending block: %1%") % block_info_key->hash.to_string ());
}
write (output_stream, block_info_key->hash.bytes);
write (output_stream, block_info->amount.bytes);
}
auto this_l (shared_from_this ());
connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->sent_action (ec, size_a);
});
}
else
{
/*
* Otherwise, finalize the connection
*/
if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Done sending blocks"));
}
send_finished ();
}
}
std::pair<std::unique_ptr<rai::pending_key>, std::unique_ptr<rai::pending_info>> rai::bulk_pull_account_server::get_next ()
{
std::pair<std::unique_ptr<rai::pending_key>, std::unique_ptr<rai::pending_info>> result;
while (true)
{
/*
* For each iteration of this loop, establish and then
* destroy a database transaction, to avoid locking the
* database for a prolonged period.
*/
rai::transaction stream_transaction (connection->node->store.environment, nullptr, false);
auto stream (connection->node->store.pending_begin (stream_transaction, current_key));
if (stream->first == nullptr)
{
break;
}
rai::pending_key key (stream->first);
rai::pending_info info (stream->second);
/*
* Get the key for the next value, to use in the next call or iteration
*/
current_key.account = key.account;
current_key.hash = key.hash.number () + 1;
/*
* Finish up if the response is for a different account
*/
if (key.account != request->account)
{
break;
}
/*
* Skip entries where the amount is less than the requested
* minimum
*/
if (info.amount < request->minimum_amount)
{
continue;
}
/*
* If the pending_address_only flag is set, de-duplicate the
* responses. The responses are the address of the sender,
* so they are are part of the pending table's information
* and not key, so we have to de-duplicate them manually.
*/
if (pending_address_only)
{
if (deduplication.count (info.source) != 0)
{
continue;
}
deduplication.insert ({ info.source, true });
}
result.first = std::unique_ptr<rai::pending_key> (new rai::pending_key (key));
result.second = std::unique_ptr<rai::pending_info> (new rai::pending_info (info));
break;
}
return result;
}
void rai::bulk_pull_account_server::sent_action (boost::system::error_code const & ec, size_t size_a)
{
if (!ec)
{
send_next_block ();
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Unable to bulk send block: %1%") % ec.message ());
}
}
}
void rai::bulk_pull_account_server::send_finished ()
{
/*
* The "bulk_pull_account" final sequence is a final block of all
* zeros. If we are sending only account public keys (with the
* "pending_address_only" flag) then it will be 256-bits of zeros,
* otherwise it will be 384-bits of zeros.
*/
send_buffer->clear ();
{
rai::vectorstream output_stream (*send_buffer);
rai::uint256_union account_zero (0);
rai::uint128_union balance_zero (0);
write (output_stream, account_zero.bytes);
if (!pending_address_only)
{
write (output_stream, balance_zero.bytes);
}
}
auto this_l (shared_from_this ());
if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection->node->log) << "Bulk sending for an account finished";
}
connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->complete (ec, size_a);
});
}
void rai::bulk_pull_account_server::complete (boost::system::error_code const & ec, size_t size_a)
{
if (!ec)
{
if (pending_address_only)
{
assert (size_a == 32);
}
else
{
assert (size_a == 48);
}
connection->finish_request ();
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection->node->log) << "Unable to pending-as-zero";
}
}
}
rai::bulk_pull_account_server::bulk_pull_account_server (std::shared_ptr<rai::bootstrap_server> const & connection_a, std::unique_ptr<rai::bulk_pull_account> request_a) :
connection (connection_a),
request (std::move (request_a)),
send_buffer (std::make_shared<std::vector<uint8_t>> ()),
current_key (0, 0)
{
/*
* Setup the streaming response for the first call to "send_frontier" and "send_next_block"
*/
set_params ();
}
/**
* Bulk pull of a range of blocks, or a checksum for a range of
* blocks [min_hash, max_hash) up to a max of max_count. mode

View file

@ -218,6 +218,7 @@ public:
void receive ();
void receive_header_action (boost::system::error_code const &, size_t);
void receive_bulk_pull_action (boost::system::error_code const &, size_t, rai::message_header const &);
void receive_bulk_pull_account_action (boost::system::error_code const &, size_t, rai::message_header const &);
void receive_bulk_pull_blocks_action (boost::system::error_code const &, size_t, rai::message_header const &);
void receive_frontier_req_action (boost::system::error_code const &, size_t, rai::message_header const &);
void receive_bulk_push_action ();
@ -247,6 +248,26 @@ public:
rai::block_hash current;
bool include_start;
};
class bulk_pull_account;
class bulk_pull_account_server : public std::enable_shared_from_this<rai::bulk_pull_account_server>
{
public:
bulk_pull_account_server (std::shared_ptr<rai::bootstrap_server> const &, std::unique_ptr<rai::bulk_pull_account>);
void set_params ();
std::pair<std::unique_ptr<rai::pending_key>, std::unique_ptr<rai::pending_info>> get_next ();
void send_frontier ();
void send_next_block ();
void sent_action (boost::system::error_code const &, size_t);
void send_finished ();
void complete (boost::system::error_code const &, size_t);
std::shared_ptr<rai::bootstrap_server> connection;
std::unique_ptr<rai::bulk_pull_account> request;
std::shared_ptr<std::vector<uint8_t>> send_buffer;
std::unordered_map<rai::uint256_union, bool> deduplication;
rai::pending_key current_key;
bool pending_address_only;
bool invalid_request;
};
class bulk_pull_blocks;
class bulk_pull_blocks_server : public std::enable_shared_from_this<rai::bulk_pull_blocks_server>
{

View file

@ -509,6 +509,48 @@ void rai::bulk_pull::serialize (rai::stream & stream_a)
write (stream_a, end);
}
rai::bulk_pull_account::bulk_pull_account () :
message (rai::message_type::bulk_pull_account)
{
}
rai::bulk_pull_account::bulk_pull_account (bool & error_a, rai::stream & stream_a, rai::message_header const & header_a) :
message (header_a)
{
if (!error_a)
{
error_a = deserialize (stream_a);
}
}
void rai::bulk_pull_account::visit (rai::message_visitor & visitor_a) const
{
visitor_a.bulk_pull_account (*this);
}
bool rai::bulk_pull_account::deserialize (rai::stream & stream_a)
{
assert (header.type == rai::message_type::bulk_pull_account);
auto result (read (stream_a, account));
if (!result)
{
result = read (stream_a, minimum_amount);
if (!result)
{
result = read (stream_a, flags);
}
}
return result;
}
void rai::bulk_pull_account::serialize (rai::stream & stream_a)
{
header.serialize (stream_a);
write (stream_a, account);
write (stream_a, minimum_amount);
write (stream_a, flags);
}
rai::bulk_pull_blocks::bulk_pull_blocks () :
message (rai::message_type::bulk_pull_blocks)
{

View file

@ -140,13 +140,19 @@ enum class message_type : uint8_t
bulk_push = 0x7,
frontier_req = 0x8,
bulk_pull_blocks = 0x9,
node_id_handshake = 0x0a
node_id_handshake = 0x0a,
bulk_pull_account = 0x0b
};
enum class bulk_pull_blocks_mode : uint8_t
{
list_blocks,
checksum_blocks
};
enum class bulk_pull_account_flags : uint8_t
{
pending_hash_and_amount = 0x0,
pending_address_only = 0x1
};
class message_visitor;
class message_header
{
@ -277,6 +283,18 @@ public:
rai::uint256_union start;
rai::block_hash end;
};
class bulk_pull_account : public message
{
public:
bulk_pull_account ();
bulk_pull_account (bool &, rai::stream &, rai::message_header const &);
bool deserialize (rai::stream &) override;
void serialize (rai::stream &) override;
void visit (rai::message_visitor &) const override;
rai::uint256_union account;
rai::uint128_union minimum_amount;
bulk_pull_account_flags flags;
};
class bulk_pull_blocks : public message
{
public:
@ -321,6 +339,7 @@ public:
virtual void confirm_req (rai::confirm_req const &) = 0;
virtual void confirm_ack (rai::confirm_ack const &) = 0;
virtual void bulk_pull (rai::bulk_pull const &) = 0;
virtual void bulk_pull_account (rai::bulk_pull_account const &) = 0;
virtual void bulk_pull_blocks (rai::bulk_pull_blocks const &) = 0;
virtual void bulk_push (rai::bulk_push const &) = 0;
virtual void frontier_req (rai::frontier_req const &) = 0;

View file

@ -422,6 +422,10 @@ public:
{
assert (false);
}
void bulk_pull_account (rai::bulk_pull_account const &) override
{
assert (false);
}
void bulk_pull_blocks (rai::bulk_pull_blocks const &) override
{
assert (false);

View file

@ -362,6 +362,9 @@ std::string rai::stat::detail_to_string (uint32_t key)
case rai::stat::detail::bulk_pull:
res = "bulk_pull";
break;
case rai::stat::detail::bulk_pull_account:
res = "bulk_pull_account";
break;
case rai::stat::detail::bulk_pull_blocks:
res = "bulk_pull_blocks";
break;

View file

@ -216,6 +216,7 @@ public:
initiate,
bulk_pull,
bulk_push,
bulk_pull_account,
bulk_pull_blocks,
frontier_req,