Merge pull request #3812 from clemahieu/block_deserializer

Extracting nano::bootstrap::block_deserializer class
This commit is contained in:
clemahieu 2022-07-15 15:56:47 +01:00 committed by GitHub
commit 0a23b40b19
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 227 additions and 171 deletions

View file

@ -1,3 +1,4 @@
#include <nano/node/bootstrap/block_deserializer.hpp>
#include <nano/node/bootstrap/bootstrap_frontier.hpp>
#include <nano/node/bootstrap/bootstrap_lazy.hpp>
#include <nano/test_common/system.hpp>
@ -1440,7 +1441,7 @@ TEST (bootstrap_processor, wallet_lazy_pending)
node0->block_processor.add (send2);
node0->block_processor.flush ();
// Start wallet lazy bootstrap
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
auto node1 = system.add_node ();
node1->network.udp_channels.insert (node0->network.endpoint (), node1->network_params.network.protocol_version);
auto wallet (node1->wallets.create (nano::random_wallet_id ()));
ASSERT_NE (nullptr, wallet);
@ -1448,7 +1449,6 @@ TEST (bootstrap_processor, wallet_lazy_pending)
node1->bootstrap_wallet ();
// Check processed blocks
ASSERT_TIMELY (10s, node1->ledger.block_or_pruned_exists (send2->hash ()));
node1->stop ();
}
TEST (bootstrap_processor, multiple_attempts)
@ -2033,3 +2033,8 @@ TEST (bulk_pull_account, basics)
ASSERT_EQ (nullptr, block_data.second.get ());
}
}
TEST (block_deserializer, construction)
{
auto deserializer = std::make_shared<nano::bootstrap::block_deserializer> ();
}

View file

@ -18,6 +18,8 @@ add_library(
active_transactions.cpp
blockprocessor.hpp
blockprocessor.cpp
bootstrap/block_deserializer.hpp
bootstrap/block_deserializer.cpp
bootstrap/bootstrap_attempt.hpp
bootstrap/bootstrap_attempt.cpp
bootstrap/bootstrap_bulk_pull.hpp

View file

@ -0,0 +1,65 @@
#include <nano/lib/blocks.hpp>
#include <nano/node/bootstrap/block_deserializer.hpp>
#include <nano/node/socket.hpp>
#include <nano/secure/buffer.hpp>
nano::bootstrap::block_deserializer::block_deserializer () :
read_buffer{ std::make_shared<std::vector<uint8_t>> () }
{
}
void nano::bootstrap::block_deserializer::read (nano::socket & socket, callback_type const && callback)
{
debug_assert (callback);
read_buffer->resize (1);
socket.async_read (read_buffer, 1, [this_l = shared_from_this (), &socket, callback = std::move (callback)] (boost::system::error_code const & ec, std::size_t size_a) {
if (ec)
{
callback (ec, nullptr);
return;
}
if (size_a != 1)
{
callback (boost::asio::error::fault, nullptr);
return;
}
this_l->received_type (socket, std::move (callback));
});
}
void nano::bootstrap::block_deserializer::received_type (nano::socket & socket, callback_type const && callback)
{
nano::block_type type = static_cast<nano::block_type> (read_buffer->data ()[0]);
if (type == nano::block_type::not_a_block)
{
callback (boost::system::error_code{}, nullptr);
return;
}
auto size = nano::block::size (type);
if (size == 0)
{
callback (boost::asio::error::fault, nullptr);
return;
}
read_buffer->resize (size);
socket.async_read (read_buffer, size, [this_l = shared_from_this (), size, type, callback = std::move (callback)] (boost::system::error_code const & ec, std::size_t size_a) {
if (ec)
{
callback (ec, nullptr);
return;
}
if (size_a != size)
{
callback (boost::asio::error::fault, nullptr);
return;
}
this_l->received_block (type, std::move (callback));
});
}
void nano::bootstrap::block_deserializer::received_block (nano::block_type type, callback_type const && callback)
{
nano::bufferstream stream{ read_buffer->data (), read_buffer->size () };
auto block = nano::deserialize_block (stream, type);
callback (boost::system::error_code{}, block);
}

View file

@ -0,0 +1,47 @@
#pragma once
#include <nano/lib/blocks.hpp>
#include <boost/system/error_code.hpp>
#include <memory>
#include <vector>
namespace nano
{
class block;
class socket;
namespace bootstrap
{
/**
* Class to read a block-type byte followed by a serialised block from a stream.
* It is typically used to used to read a series of block-types and blocks terminated by a not-a-block type.
*/
class block_deserializer : public std::enable_shared_from_this<nano::bootstrap::block_deserializer>
{
public:
using callback_type = std::function<void (boost::system::error_code, std::shared_ptr<nano::block>)>;
block_deserializer ();
/**
* Read a type-prefixed block from 'socket' and pass the result, or an error, to 'callback'
* A normal end to series of blocks is a marked by return no error and a nullptr for block.
*/
void read (nano::socket & socket, callback_type const && callback);
private:
/**
* Called by read method on receipt of a block type byte.
* The type byte will be in the read_buffer.
*/
void received_type (nano::socket & socket, callback_type const && callback);
/**
* Called by received_type when a block is received, it parses the block and calls the callback.
*/
void received_block (nano::block_type type, callback_type const && callback);
std::shared_ptr<std::vector<uint8_t>> read_buffer;
};
}
}

View file

@ -1,3 +1,4 @@
#include <nano/node/bootstrap/block_deserializer.hpp>
#include <nano/node/bootstrap/bootstrap.hpp>
#include <nano/node/bootstrap/bootstrap_bulk_pull.hpp>
#include <nano/node/bootstrap/bootstrap_connections.hpp>
@ -19,12 +20,10 @@ nano::pull_info::pull_info (nano::hash_or_account const & account_or_head_a, nan
}
nano::bulk_pull_client::bulk_pull_client (std::shared_ptr<nano::bootstrap_client> const & connection_a, std::shared_ptr<nano::bootstrap_attempt> const & attempt_a, nano::pull_info const & pull_a) :
connection (connection_a),
attempt (attempt_a),
known_account{},
pull (pull_a),
pull_blocks (0),
unexpected_count (0)
connection{ connection_a },
attempt{ attempt_a },
pull{ pull_a },
block_deserializer{ std::make_shared<nano::bootstrap::block_deserializer> () }
{
attempt->condition.notify_all ();
}
@ -121,166 +120,80 @@ void nano::bulk_pull_client::throttled_receive_block ()
void nano::bulk_pull_client::receive_block ()
{
auto this_l (shared_from_this ());
connection->socket->async_read (connection->receive_buffer, 1, [this_l] (boost::system::error_code const & ec, std::size_t size_a) {
if (!ec)
{
this_l->received_type ();
}
else
{
if (this_l->connection->node->config.logging.bulk_pull_logging ())
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error receiving block type: %1%") % ec.message ()));
}
this_l->connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_receive_block_failure, nano::stat::dir::in);
this_l->network_error = true;
}
block_deserializer->read (*connection->socket, [this_l = shared_from_this ()] (boost::system::error_code ec, std::shared_ptr<nano::block> block) {
this_l->received_block (ec, block);
});
}
void nano::bulk_pull_client::received_type ()
void nano::bulk_pull_client::received_block (boost::system::error_code ec, std::shared_ptr<nano::block> block)
{
auto this_l (shared_from_this ());
nano::block_type type (static_cast<nano::block_type> (connection->receive_buffer->data ()[0]));
auto const & socket_l = connection->socket;
switch (type)
if (ec)
{
case nano::block_type::send:
{
socket_l->async_read (connection->receive_buffer, nano::send_block::size, [this_l, type] (boost::system::error_code const & ec, std::size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::receive:
{
socket_l->async_read (connection->receive_buffer, nano::receive_block::size, [this_l, type] (boost::system::error_code const & ec, std::size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::open:
{
socket_l->async_read (connection->receive_buffer, nano::open_block::size, [this_l, type] (boost::system::error_code const & ec, std::size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::change:
{
socket_l->async_read (connection->receive_buffer, nano::change_block::size, [this_l, type] (boost::system::error_code const & ec, std::size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::state:
{
socket_l->async_read (connection->receive_buffer, nano::state_block::size, [this_l, type] (boost::system::error_code const & ec, std::size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::not_a_block:
{
// Avoid re-using slow peers, or peers that sent the wrong blocks.
if (!connection->pending_stop && (expected == pull.end || (pull.count != 0 && pull.count == pull_blocks)))
{
connection->connections.pool_connection (connection);
}
break;
}
default:
{
if (connection->node->config.logging.network_packet_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Unknown type received as block type: %1%") % static_cast<int> (type)));
}
break;
}
network_error = true;
return;
}
}
void nano::bulk_pull_client::received_block (boost::system::error_code const & ec, std::size_t size_a, nano::block_type type_a)
{
if (!ec)
if (block == nullptr)
{
nano::bufferstream stream (connection->receive_buffer->data (), size_a);
auto block (nano::deserialize_block (stream, type_a));
if (block != nullptr && !connection->node->network_params.work.validate_entry (*block))
// Avoid re-using slow peers, or peers that sent the wrong blocks.
if (!connection->pending_stop && (expected == pull.end || (pull.count != 0 && pull.count == pull_blocks)))
{
auto hash (block->hash ());
if (connection->node->config.logging.bulk_pull_logging ())
{
std::string block_l;
block->serialize_json (block_l, connection->node->config.logging.single_line_record ());
connection->node->logger.try_log (boost::str (boost::format ("Pulled block %1% %2%") % hash.to_string () % block_l));
}
// Is block expected?
bool block_expected (false);
// Unconfirmed head is used only for lazy destinations if legacy bootstrap is not available, see nano::bootstrap_attempt::lazy_destinations_increment (...)
bool unconfirmed_account_head (connection->node->flags.disable_legacy_bootstrap && pull_blocks == 0 && pull.retry_limit <= connection->node->network_params.bootstrap.lazy_retry_limit && expected == pull.account_or_head && block->account () == pull.account_or_head);
if (hash == expected || unconfirmed_account_head)
{
expected = block->previous ();
block_expected = true;
}
else
{
unexpected_count++;
}
if (pull_blocks == 0 && block_expected)
{
known_account = block->account ();
}
if (connection->block_count++ == 0)
{
connection->set_start_time (std::chrono::steady_clock::now ());
}
attempt->total_blocks++;
pull_blocks++;
bool stop_pull (attempt->process_block (block, known_account, pull_blocks, pull.count, block_expected, pull.retry_limit));
if (!stop_pull && !connection->hard_stop.load ())
{
/* Process block in lazy pull if not stopped
Stop usual pull request with unexpected block & more than 16k blocks processed
to prevent spam */
if (attempt->mode != nano::bootstrap_mode::legacy || unexpected_count < 16384)
{
throttled_receive_block ();
}
}
else if (stop_pull && block_expected)
{
connection->connections.pool_connection (connection);
}
}
else if (block == nullptr)
{
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log ("Error deserializing block received from pull request");
}
connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_deserialize_receive_block, nano::stat::dir::in);
}
else // Work invalid
{
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Insufficient work for bulk pull block: %1%") % block->hash ().to_string ()));
}
connection->node->stats.inc_detail_only (nano::stat::type::error, nano::stat::detail::insufficient_work);
connection->connections.pool_connection (connection);
}
return;
}
else
if (connection->node->network_params.work.validate_entry (*block))
{
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Error bulk receiving block: %1%") % ec.message ()));
connection->node->logger.try_log (boost::str (boost::format ("Insufficient work for bulk pull block: %1%") % block->hash ().to_string ()));
}
connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_receive_block_failure, nano::stat::dir::in);
network_error = true;
connection->node->stats.inc_detail_only (nano::stat::type::error, nano::stat::detail::insufficient_work);
return;
}
auto hash = block->hash ();
if (connection->node->config.logging.bulk_pull_logging ())
{
std::string block_l;
block->serialize_json (block_l, connection->node->config.logging.single_line_record ());
connection->node->logger.try_log (boost::str (boost::format ("Pulled block %1% %2%") % hash.to_string () % block_l));
}
// Is block expected?
bool block_expected (false);
// Unconfirmed head is used only for lazy destinations if legacy bootstrap is not available, see nano::bootstrap_attempt::lazy_destinations_increment (...)
bool unconfirmed_account_head (connection->node->flags.disable_legacy_bootstrap && pull_blocks == 0 && pull.retry_limit <= connection->node->network_params.bootstrap.lazy_retry_limit && expected == pull.account_or_head && block->account () == pull.account_or_head);
if (hash == expected || unconfirmed_account_head)
{
expected = block->previous ();
block_expected = true;
}
else
{
unexpected_count++;
}
if (pull_blocks == 0 && block_expected)
{
known_account = block->account ();
}
if (connection->block_count++ == 0)
{
connection->set_start_time (std::chrono::steady_clock::now ());
}
attempt->total_blocks++;
pull_blocks++;
bool stop_pull (attempt->process_block (block, known_account, pull_blocks, pull.count, block_expected, pull.retry_limit));
if (!stop_pull && !connection->hard_stop.load ())
{
/* Process block in lazy pull if not stopped
Stop usual pull request with unexpected block & more than 16k blocks processed
to prevent spam */
if (attempt->mode != nano::bootstrap_mode::legacy || unexpected_count < 16384)
{
throttled_receive_block ();
}
}
else if (!stop_pull && block_expected)
{
connection->connections.pool_connection (connection);
}
}

View file

@ -8,6 +8,10 @@
namespace nano
{
class bootstrap_attempt;
namespace bootstrap
{
class block_deserializer;
};
class pull_info
{
public:
@ -37,17 +41,36 @@ public:
void request ();
void receive_block ();
void throttled_receive_block ();
void received_type ();
void received_block (boost::system::error_code const &, std::size_t, nano::block_type);
void received_block (boost::system::error_code ec, std::shared_ptr<nano::block> block);
nano::block_hash first ();
std::shared_ptr<nano::bootstrap_client> connection;
std::shared_ptr<nano::bootstrap_attempt> attempt;
nano::block_hash expected;
nano::account known_account;
nano::pull_info pull;
uint64_t pull_blocks;
uint64_t unexpected_count;
bool network_error{ false };
private:
/**
* Tracks the next block expected to be received starting with the block hash that was expected and followed by previous blocks for this account chain
*/
nano::block_hash expected{ 0 };
/**
* Tracks the account number for this account chain
* Used when an account chain has a mix between state blocks and legacy blocks which do not encode the account number in the block
* 0 if the account is unknown
*/
nano::account known_account{ 0 };
/**
* Original pull request
*/
nano::pull_info pull;
/**
* Tracks the number of blocks successfully deserialized
*/
uint64_t pull_blocks{ 0 };
/**
* Tracks the number of times an unexpected block was received
*/
uint64_t unexpected_count{ 0 };
std::shared_ptr<nano::bootstrap::block_deserializer> block_deserializer;
};
class bootstrap_attempt_wallet;
class bulk_pull_account_client final : public std::enable_shared_from_this<nano::bulk_pull_account_client>

View file

@ -233,25 +233,26 @@ void nano::bulk_push_server::received_block (boost::system::error_code const & e
{
nano::bufferstream stream (receive_buffer->data (), size_a);
auto block (nano::deserialize_block (stream, type_a));
if (block != nullptr && !connection->node->network_params.work.validate_entry (*block))
if (block != nullptr)
{
if (connection->node->network_params.work.validate_entry (*block))
{
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Insufficient work for bulk push block: %1%") % block->hash ().to_string ()));
}
connection->node->stats.inc_detail_only (nano::stat::type::error, nano::stat::detail::insufficient_work);
return;
}
connection->node->process_active (std::move (block));
throttled_receive ();
}
else if (block == nullptr)
else
{
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log ("Error deserializing block received from pull request");
}
}
else // Work invalid
{
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Insufficient work for bulk push block: %1%") % block->hash ().to_string ()));
}
connection->node->stats.inc_detail_only (nano::stat::type::error, nano::stat::detail::insufficient_work);
}
}
}