Bulk pull ascending flag (#3867)
* Add a new flag to the bulk_pull messaged called 'ascending'. This flag requests the server to operate on account chains in ascending order toward higher block hight, rather than descending order toward the initial block. * Increasing protocol version indicating support for the bulk_pull ascending flag. * Adding separate convenience function section and commenting node::process function.
This commit is contained in:
		
					parent
					
						
							
								6dd610b112
							
						
					
				
			
			
				commit
				
					
						a8681a4a06
					
				
			
		
					 8 changed files with 157 additions and 11 deletions
				
			
		| 
						 | 
				
			
			@ -110,6 +110,108 @@ TEST (bulk_pull, get_next_on_open)
 | 
			
		|||
	ASSERT_EQ (request->current, request->request->end);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
	Tests that the ascending flag is respected in the bulk_pull message when given a known block hash
 | 
			
		||||
 */
 | 
			
		||||
TEST (bulk_pull, ascending_one_hash)
 | 
			
		||||
{
 | 
			
		||||
	nano::system system{ 1 };
 | 
			
		||||
	auto & node = *system.nodes[0];
 | 
			
		||||
	nano::state_block_builder builder;
 | 
			
		||||
	auto block1 = builder
 | 
			
		||||
				  .account (nano::dev::genesis_key.pub)
 | 
			
		||||
				  .previous (nano::dev::genesis->hash ())
 | 
			
		||||
				  .representative (nano::dev::genesis_key.pub)
 | 
			
		||||
				  .balance (nano::dev::constants.genesis_amount - 100)
 | 
			
		||||
				  .link (nano::dev::genesis_key.pub)
 | 
			
		||||
				  .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
 | 
			
		||||
				  .work (0)
 | 
			
		||||
				  .build_shared ();
 | 
			
		||||
	node.work_generate_blocking (*block1);
 | 
			
		||||
	ASSERT_EQ (nano::process_result::progress, node.process (*block1).code);
 | 
			
		||||
	auto socket = std::make_shared<nano::socket> (node, nano::socket::endpoint_type_t::server);
 | 
			
		||||
	auto connection = std::make_shared<nano::bootstrap_server> (socket, system.nodes[0]);
 | 
			
		||||
	auto req = std::make_unique<nano::bulk_pull> (nano::dev::network_params.network);
 | 
			
		||||
	req->start = nano::dev::genesis->hash ();
 | 
			
		||||
	req->end = nano::dev::genesis->hash ();
 | 
			
		||||
	req->header.flag_set (nano::message_header::bulk_pull_ascending_flag);
 | 
			
		||||
	connection->requests.push (std::unique_ptr<nano::message>{});
 | 
			
		||||
	auto request = std::make_shared<nano::bulk_pull_server> (connection, std::move (req));
 | 
			
		||||
	auto block_out1 = request->get_next ();
 | 
			
		||||
	ASSERT_NE (nullptr, block_out1);
 | 
			
		||||
	ASSERT_EQ (block_out1->hash (), nano::dev::genesis->hash ());
 | 
			
		||||
	ASSERT_EQ (nullptr, request->get_next ());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
	Tests that the ascending flag is respected in the bulk_pull message when given an account number
 | 
			
		||||
 */
 | 
			
		||||
TEST (bulk_pull, ascending_two_account)
 | 
			
		||||
{
 | 
			
		||||
	nano::system system{ 1 };
 | 
			
		||||
	auto & node = *system.nodes[0];
 | 
			
		||||
	nano::state_block_builder builder;
 | 
			
		||||
	auto block1 = builder
 | 
			
		||||
				  .account (nano::dev::genesis_key.pub)
 | 
			
		||||
				  .previous (nano::dev::genesis->hash ())
 | 
			
		||||
				  .representative (nano::dev::genesis_key.pub)
 | 
			
		||||
				  .balance (nano::dev::constants.genesis_amount - 100)
 | 
			
		||||
				  .link (nano::dev::genesis_key.pub)
 | 
			
		||||
				  .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
 | 
			
		||||
				  .work (0)
 | 
			
		||||
				  .build_shared ();
 | 
			
		||||
	node.work_generate_blocking (*block1);
 | 
			
		||||
	ASSERT_EQ (nano::process_result::progress, node.process (*block1).code);
 | 
			
		||||
	auto socket = std::make_shared<nano::socket> (node, nano::socket::endpoint_type_t::server);
 | 
			
		||||
	auto connection = std::make_shared<nano::bootstrap_server> (socket, system.nodes[0]);
 | 
			
		||||
	auto req = std::make_unique<nano::bulk_pull> (nano::dev::network_params.network);
 | 
			
		||||
	req->start = nano::dev::genesis->hash ();
 | 
			
		||||
	req->end.clear ();
 | 
			
		||||
	req->header.flag_set (nano::message_header::bulk_pull_ascending_flag);
 | 
			
		||||
	connection->requests.push (std::unique_ptr<nano::message>{});
 | 
			
		||||
	auto request = std::make_shared<nano::bulk_pull_server> (connection, std::move (req));
 | 
			
		||||
	auto block_out1 = request->get_next ();
 | 
			
		||||
	ASSERT_NE (nullptr, block_out1);
 | 
			
		||||
	ASSERT_EQ (block_out1->hash (), nano::dev::genesis->hash ());
 | 
			
		||||
	auto block_out2 = request->get_next ();
 | 
			
		||||
	ASSERT_NE (nullptr, block_out2);
 | 
			
		||||
	ASSERT_EQ (block_out2->hash (), block1->hash ());
 | 
			
		||||
	ASSERT_EQ (nullptr, request->get_next ());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
	Tests that the `end' value is respected in the bulk_pull message
 | 
			
		||||
 */
 | 
			
		||||
TEST (bulk_pull, ascending_end)
 | 
			
		||||
{
 | 
			
		||||
	nano::system system{ 1 };
 | 
			
		||||
	auto & node = *system.nodes[0];
 | 
			
		||||
	nano::state_block_builder builder;
 | 
			
		||||
	auto block1 = builder
 | 
			
		||||
				  .account (nano::dev::genesis_key.pub)
 | 
			
		||||
				  .previous (nano::dev::genesis->hash ())
 | 
			
		||||
				  .representative (nano::dev::genesis_key.pub)
 | 
			
		||||
				  .balance (nano::dev::constants.genesis_amount - 100)
 | 
			
		||||
				  .link (nano::dev::genesis_key.pub)
 | 
			
		||||
				  .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
 | 
			
		||||
				  .work (0)
 | 
			
		||||
				  .build_shared ();
 | 
			
		||||
	node.work_generate_blocking (*block1);
 | 
			
		||||
	ASSERT_EQ (nano::process_result::progress, node.process (*block1).code);
 | 
			
		||||
	auto socket = std::make_shared<nano::socket> (node, nano::socket::endpoint_type_t::server);
 | 
			
		||||
	auto connection = std::make_shared<nano::bootstrap_server> (socket, system.nodes[0]);
 | 
			
		||||
	auto req = std::make_unique<nano::bulk_pull> (nano::dev::network_params.network);
 | 
			
		||||
	req->start = nano::dev::genesis_key.pub;
 | 
			
		||||
	req->end = block1->hash ();
 | 
			
		||||
	req->header.flag_set (nano::message_header::bulk_pull_ascending_flag);
 | 
			
		||||
	connection->requests.push (std::unique_ptr<nano::message>{});
 | 
			
		||||
	auto request = std::make_shared<nano::bulk_pull_server> (connection, std::move (req));
 | 
			
		||||
	auto block_out1 = request->get_next ();
 | 
			
		||||
	ASSERT_NE (nullptr, block_out1);
 | 
			
		||||
	ASSERT_EQ (block_out1->hash (), nano::dev::genesis->hash ());
 | 
			
		||||
	ASSERT_EQ (nullptr, request->get_next ());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST (bulk_pull, by_block)
 | 
			
		||||
{
 | 
			
		||||
	nano::system system (1);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -258,3 +258,21 @@ TEST (confirm_ack, empty_vote_hashes)
 | 
			
		|||
	auto vote = std::make_shared<nano::vote> (key.pub, key.prv, 0, 0, std::vector<nano::block_hash>{} /* empty */);
 | 
			
		||||
	nano::confirm_ack message{ nano::dev::network_params.network, vote };
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST (message, bulk_pull_serialization)
 | 
			
		||||
{
 | 
			
		||||
	nano::bulk_pull message_in{ nano::dev::network_params.network };
 | 
			
		||||
	message_in.header.flag_set (nano::message_header::bulk_pull_ascending_flag);
 | 
			
		||||
	std::vector<uint8_t> bytes;
 | 
			
		||||
	{
 | 
			
		||||
		nano::vectorstream stream{ bytes };
 | 
			
		||||
		message_in.serialize (stream);
 | 
			
		||||
	}
 | 
			
		||||
	nano::bufferstream stream{ bytes.data (), bytes.size () };
 | 
			
		||||
	bool error = false;
 | 
			
		||||
	nano::message_header header{ error, stream };
 | 
			
		||||
	ASSERT_FALSE (error);
 | 
			
		||||
	nano::bulk_pull message_out{ error, stream, header };
 | 
			
		||||
	ASSERT_FALSE (error);
 | 
			
		||||
	ASSERT_TRUE (header.bulk_pull_ascending ());
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -274,7 +274,7 @@ public:
 | 
			
		|||
	/** Initial value is ACTIVE_NETWORK compile flag, but can be overridden by a CLI flag */
 | 
			
		||||
	static nano::networks active_network;
 | 
			
		||||
	/** Current protocol version */
 | 
			
		||||
	uint8_t const protocol_version = 0x12;
 | 
			
		||||
	uint8_t const protocol_version = 0x13;
 | 
			
		||||
	/** Minimum accepted protocol version */
 | 
			
		||||
	uint8_t const protocol_version_min = 0x12;
 | 
			
		||||
};
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -365,7 +365,7 @@ void nano::bulk_pull_server::set_current_end ()
 | 
			
		|||
		}
 | 
			
		||||
		else
 | 
			
		||||
		{
 | 
			
		||||
			current = info.head;
 | 
			
		||||
			current = ascending () ? info.open_block : info.head;
 | 
			
		||||
			if (!request->end.is_zero ())
 | 
			
		||||
			{
 | 
			
		||||
				auto account (connection->node->ledger.account (transaction, request->end));
 | 
			
		||||
| 
						 | 
				
			
			@ -394,7 +394,7 @@ void nano::bulk_pull_server::set_current_end ()
 | 
			
		|||
 | 
			
		||||
void nano::bulk_pull_server::send_next ()
 | 
			
		||||
{
 | 
			
		||||
	auto block (get_next ());
 | 
			
		||||
	auto block = get_next ();
 | 
			
		||||
	if (block != nullptr)
 | 
			
		||||
	{
 | 
			
		||||
		std::vector<uint8_t> send_buffer;
 | 
			
		||||
| 
						 | 
				
			
			@ -402,12 +402,11 @@ void nano::bulk_pull_server::send_next ()
 | 
			
		|||
			nano::vectorstream stream (send_buffer);
 | 
			
		||||
			nano::serialize_block (stream, *block);
 | 
			
		||||
		}
 | 
			
		||||
		auto this_l (shared_from_this ());
 | 
			
		||||
		if (connection->node->config.logging.bulk_pull_logging ())
 | 
			
		||||
		{
 | 
			
		||||
			connection->node->logger.try_log (boost::str (boost::format ("Sending block: %1%") % block->hash ().to_string ()));
 | 
			
		||||
		}
 | 
			
		||||
		connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l] (boost::system::error_code const & ec, std::size_t size_a) {
 | 
			
		||||
		connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) {
 | 
			
		||||
			this_l->sent_action (ec, size_a);
 | 
			
		||||
		});
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -461,10 +460,10 @@ std::shared_ptr<nano::block> nano::bulk_pull_server::get_next ()
 | 
			
		|||
		result = connection->node->block (current);
 | 
			
		||||
		if (result != nullptr && set_current_to_end == false)
 | 
			
		||||
		{
 | 
			
		||||
			auto previous (result->previous ());
 | 
			
		||||
			if (!previous.is_zero ())
 | 
			
		||||
			auto next = ascending () ? result->sideband ().successor : result->previous ();
 | 
			
		||||
			if (!next.is_zero ())
 | 
			
		||||
			{
 | 
			
		||||
				current = previous;
 | 
			
		||||
				current = next;
 | 
			
		||||
			}
 | 
			
		||||
			else
 | 
			
		||||
			{
 | 
			
		||||
| 
						 | 
				
			
			@ -532,6 +531,11 @@ void nano::bulk_pull_server::no_block_sent (boost::system::error_code const & ec
 | 
			
		|||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
bool nano::bulk_pull_server::ascending () const
 | 
			
		||||
{
 | 
			
		||||
	return request->header.bulk_pull_ascending ();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
nano::bulk_pull_server::bulk_pull_server (std::shared_ptr<nano::bootstrap_server> const & connection_a, std::unique_ptr<nano::bulk_pull> request_a) :
 | 
			
		||||
	connection (connection_a),
 | 
			
		||||
	request (std::move (request_a))
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -103,6 +103,7 @@ public:
 | 
			
		|||
	void sent_action (boost::system::error_code const &, std::size_t);
 | 
			
		||||
	void send_finished ();
 | 
			
		||||
	void no_block_sent (boost::system::error_code const &, std::size_t);
 | 
			
		||||
	bool ascending () const;
 | 
			
		||||
	std::shared_ptr<nano::bootstrap_server> connection;
 | 
			
		||||
	std::unique_ptr<nano::bulk_pull> request;
 | 
			
		||||
	nano::block_hash current;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -214,6 +214,19 @@ bool nano::message_header::bulk_pull_is_count_present () const
 | 
			
		|||
	return result;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
bool nano::message_header::bulk_pull_ascending () const
 | 
			
		||||
{
 | 
			
		||||
	auto result (false);
 | 
			
		||||
	if (type == nano::message_type::bulk_pull)
 | 
			
		||||
	{
 | 
			
		||||
		if (extensions.test (bulk_pull_ascending_flag))
 | 
			
		||||
		{
 | 
			
		||||
			result = true;
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return result;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
bool nano::message_header::frontier_req_is_only_confirmed_present () const
 | 
			
		||||
{
 | 
			
		||||
	auto result (false);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -223,7 +223,9 @@ public:
 | 
			
		|||
 | 
			
		||||
	void flag_set (uint8_t);
 | 
			
		||||
	static uint8_t constexpr bulk_pull_count_present_flag = 0;
 | 
			
		||||
	static uint8_t constexpr bulk_pull_ascending_flag = 1;
 | 
			
		||||
	bool bulk_pull_is_count_present () const;
 | 
			
		||||
	bool bulk_pull_ascending () const;
 | 
			
		||||
	static uint8_t constexpr frontier_req_only_confirmed = 1;
 | 
			
		||||
	bool frontier_req_is_only_confirmed_present () const;
 | 
			
		||||
	static uint8_t constexpr node_id_handshake_query_flag = 0;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -107,12 +107,9 @@ public:
 | 
			
		|||
	void process_confirmed_data (nano::transaction const &, std::shared_ptr<nano::block> const &, nano::block_hash const &, nano::account &, nano::uint128_t &, bool &, bool &, nano::account &);
 | 
			
		||||
	void process_confirmed (nano::election_status const &, uint64_t = 0);
 | 
			
		||||
	void process_active (std::shared_ptr<nano::block> const &);
 | 
			
		||||
	[[nodiscard]] nano::process_return process (nano::block &);
 | 
			
		||||
	nano::process_return process_local (std::shared_ptr<nano::block> const &);
 | 
			
		||||
	void process_local_async (std::shared_ptr<nano::block> const &);
 | 
			
		||||
	void keepalive_preconfigured (std::vector<std::string> const &);
 | 
			
		||||
	nano::block_hash latest (nano::account const &);
 | 
			
		||||
	nano::uint128_t balance (nano::account const &);
 | 
			
		||||
	std::shared_ptr<nano::block> block (nano::block_hash const &);
 | 
			
		||||
	std::pair<nano::uint128_t, nano::uint128_t> balance_pending (nano::account const &, bool only_confirmed);
 | 
			
		||||
	nano::uint128_t weight (nano::account const &);
 | 
			
		||||
| 
						 | 
				
			
			@ -213,6 +210,15 @@ public:
 | 
			
		|||
	// For tests only
 | 
			
		||||
	boost::optional<uint64_t> work_generate_blocking (nano::root const &);
 | 
			
		||||
 | 
			
		||||
public: // Testing convenience functions
 | 
			
		||||
	/**
 | 
			
		||||
		Creates a new write transaction and inserts `block' and returns result
 | 
			
		||||
		Transaction is comitted before function return
 | 
			
		||||
	 */
 | 
			
		||||
	[[nodiscard]] nano::process_return process (nano::block & block);
 | 
			
		||||
	nano::block_hash latest (nano::account const &);
 | 
			
		||||
	nano::uint128_t balance (nano::account const &);
 | 
			
		||||
 | 
			
		||||
private:
 | 
			
		||||
	void long_inactivity_cleanup ();
 | 
			
		||||
	void epoch_upgrader_impl (nano::raw_key const &, nano::epoch, uint64_t, uint64_t);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue