Stateless bootstrap server (#3975)

* Add `not_a_block` serialization/deserialization support

* Add convenience `account_store::get` variant returning std::optional

* Pass by universal reference in `processing_queue`

* Add `asc_pull_req` & `asc_pull_ack` messages

messages.cpp

* Add support for new message types in `message_deserializer`

* Add handling of new message types inside `network` class

* Add new message types to stats

* Add support for new message types to `tcp_server`

* Small `tcp_server::to_bootstrap()` cleanup

* Introduce `bootstrap_server` class

* Move bandwidth_limiter to a separate component

* Introduce specialized `outbound_bandwidth_limiter` keeping track of different traffic types

* Use a separate bootstrap bandwidth limit in `bootstrap_server`

* Formatting

* Add config options for bootstrap bandwidth limits

* Add tests for `bootstrap_server`

* Comments

* Add tests for new block types

* Include starting block in bootstrap server response

This will be useful for querying which account contains source gapping block

* Fixes

* Filter channels with their capacity maxed out

* Remove needless `response_queue`

This eliminates the need for allocating response messages

* Add count field to `asc_pull_req` message

* Add `ledger::account_safe` variant

* Add additional account info in `asc_pull_ack` message

* Add stats

* Skip at_end check for asc_pull messages

* Use little endian when serializing multi byte integers

* Add tests for account_info payload type

* Formatting & gcc compilation fix

* Switch to big endian as standard encoding for network messages
This commit is contained in:
Piotr Wójcik 2022-11-01 19:31:23 +01:00 committed by GitHub
commit dd70d9a18c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
39 changed files with 2214 additions and 144 deletions

View file

@ -8,6 +8,7 @@ add_executable(
block_store.cpp
blockprocessor.cpp
bootstrap.cpp
bootstrap_server.cpp
cli.cpp
confirmation_height.cpp
confirmation_solicitor.cpp

View file

@ -0,0 +1,519 @@
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>
#include <gtest/gtest.h>
#include <iterator>
using namespace std::chrono_literals;
namespace
{
using block_list_t = std::vector<std::shared_ptr<nano::block>>;
/**
* Creates `block_count` random send blocks in an account chain
*/
block_list_t setup_chain (nano::test::system & system, nano::node & node, nano::keypair key, int block_count)
{
auto latest = node.latest (key.pub);
auto balance = node.balance (key.pub);
std::vector<std::shared_ptr<nano::block>> blocks;
for (int n = 0; n < block_count; ++n)
{
nano::keypair throwaway;
nano::block_builder builder;
balance -= 1;
auto send = builder
.send ()
.previous (latest)
.destination (throwaway.pub)
.balance (balance)
.sign (key.prv, key.pub)
.work (*system.work.generate (latest))
.build_shared ();
latest = send->hash ();
blocks.push_back (send);
}
EXPECT_TRUE (nano::test::process (node, blocks));
// Confirm whole chain at once
EXPECT_TRUE (nano::test::confirm (node, { blocks.back () }));
EXPECT_TIMELY (5s, nano::test::confirmed (node, blocks));
return blocks;
}
/**
* Creates `count` account chains, each with `block_count` blocks
*/
std::vector<std::pair<nano::account, block_list_t>> setup_chains (nano::test::system & system, nano::node & node, int count, int block_count)
{
auto latest = node.latest (nano::dev::genesis_key.pub);
auto balance = node.balance (nano::dev::genesis_key.pub);
std::vector<std::pair<nano::account, block_list_t>> chains;
for (int n = 0; n < count; ++n)
{
nano::keypair key;
nano::block_builder builder;
balance -= block_count * 2; // Send enough to later create `block_count` blocks
auto send = builder
.send ()
.previous (latest)
.destination (key.pub)
.balance (balance)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (latest))
.build_shared ();
auto open = builder
.open ()
.source (send->hash ())
.representative (key.pub)
.account (key.pub)
.sign (key.prv, key.pub)
.work (*system.work.generate (key.pub))
.build_shared ();
latest = send->hash ();
// Ensure blocks are in the ledger and confirmed
EXPECT_TRUE (nano::test::process (node, { send, open }));
EXPECT_TRUE (nano::test::confirm (node, { send, open }));
EXPECT_TIMELY (5s, nano::test::confirmed (node, { send, open }));
auto added_blocks = setup_chain (system, node, key, block_count);
auto blocks = block_list_t{ open };
blocks.insert (blocks.end (), added_blocks.begin (), added_blocks.end ());
chains.emplace_back (key.pub, blocks);
}
return chains;
}
/**
* Helper to track responses in thread safe way
*/
class responses_helper final
{
public:
void add (nano::asc_pull_ack & ack)
{
nano::lock_guard<nano::mutex> lock{ mutex };
responses.push_back (ack);
}
std::vector<nano::asc_pull_ack> get ()
{
nano::lock_guard<nano::mutex> lock{ mutex };
return responses;
}
std::size_t size ()
{
nano::lock_guard<nano::mutex> lock{ mutex };
return responses.size ();
}
private:
nano::mutex mutex;
std::vector<nano::asc_pull_ack> responses;
};
/**
* Checks if both lists contain the same blocks, with `blocks_b` skipped by `skip` elements
*/
bool compare_blocks (std::vector<std::shared_ptr<nano::block>> blocks_a, std::vector<std::shared_ptr<nano::block>> blocks_b, int skip = 0)
{
debug_assert (blocks_b.size () >= blocks_a.size () + skip);
const auto count = blocks_a.size ();
for (int n = 0; n < count; ++n)
{
auto & block_a = *blocks_a[n];
auto & block_b = *blocks_b[n + skip];
// nano::block does not have != operator
if (!(block_a == block_b))
{
return false;
}
}
return true;
}
}
TEST (bootstrap_server, serve_account_blocks)
{
nano::test::system system{};
auto & node = *system.add_node ();
responses_helper responses;
node.bootstrap_server.on_response.add ([&] (auto & response, auto & channel) {
responses.add (response);
});
auto chains = setup_chains (system, node, 1, 128);
auto [first_account, first_blocks] = chains.front ();
// Request blocks from account root
nano::asc_pull_req request{ node.network_params.network };
request.id = 7;
request.type = nano::asc_pull_type::blocks;
nano::asc_pull_req::blocks_payload request_payload;
request_payload.start = first_account;
request_payload.count = nano::bootstrap_server::max_blocks;
request.payload = request_payload;
request.update_header ();
node.network.inbound (request, nano::test::fake_channel (node));
ASSERT_TIMELY (5s, responses.size () == 1);
auto response = responses.get ().front ();
// Ensure we got response exactly for what we asked for
ASSERT_EQ (response.id, 7);
ASSERT_EQ (response.type, nano::asc_pull_type::blocks);
nano::asc_pull_ack::blocks_payload response_payload;
ASSERT_NO_THROW (response_payload = std::get<nano::asc_pull_ack::blocks_payload> (response.payload));
ASSERT_EQ (response_payload.blocks.size (), 128);
ASSERT_TRUE (compare_blocks (response_payload.blocks, first_blocks));
// Ensure we don't get any unexpected responses
ASSERT_ALWAYS (1s, responses.size () == 1);
}
TEST (bootstrap_server, serve_hash)
{
nano::test::system system{};
auto & node = *system.add_node ();
responses_helper responses;
node.bootstrap_server.on_response.add ([&] (auto & response, auto & channel) {
responses.add (response);
});
auto chains = setup_chains (system, node, 1, 256);
auto [account, blocks] = chains.front ();
// Skip a few blocks to request hash in the middle of the chain
blocks = block_list_t (std::next (blocks.begin (), 9), blocks.end ());
// Request blocks from the middle of the chain
nano::asc_pull_req request{ node.network_params.network };
request.id = 7;
request.type = nano::asc_pull_type::blocks;
nano::asc_pull_req::blocks_payload request_payload;
request_payload.start = blocks.front ()->hash ();
request_payload.count = nano::bootstrap_server::max_blocks;
request.payload = request_payload;
request.update_header ();
node.network.inbound (request, nano::test::fake_channel (node));
ASSERT_TIMELY (5s, responses.size () == 1);
auto response = responses.get ().front ();
// Ensure we got response exactly for what we asked for
ASSERT_EQ (response.id, 7);
ASSERT_EQ (response.type, nano::asc_pull_type::blocks);
nano::asc_pull_ack::blocks_payload response_payload;
ASSERT_NO_THROW (response_payload = std::get<nano::asc_pull_ack::blocks_payload> (response.payload));
ASSERT_EQ (response_payload.blocks.size (), 128);
ASSERT_TRUE (compare_blocks (response_payload.blocks, blocks));
// Ensure we don't get any unexpected responses
ASSERT_ALWAYS (1s, responses.size () == 1);
}
TEST (bootstrap_server, serve_hash_one)
{
nano::test::system system{};
auto & node = *system.add_node ();
responses_helper responses;
node.bootstrap_server.on_response.add ([&] (auto & response, auto & channel) {
responses.add (response);
});
auto chains = setup_chains (system, node, 1, 256);
auto [account, blocks] = chains.front ();
// Skip a few blocks to request hash in the middle of the chain
blocks = block_list_t (std::next (blocks.begin (), 9), blocks.end ());
// Request blocks from the middle of the chain
nano::asc_pull_req request{ node.network_params.network };
request.id = 7;
request.type = nano::asc_pull_type::blocks;
nano::asc_pull_req::blocks_payload request_payload;
request_payload.start = blocks.front ()->hash ();
request_payload.count = 1;
request.payload = request_payload;
request.update_header ();
node.network.inbound (request, nano::test::fake_channel (node));
ASSERT_TIMELY (5s, responses.size () == 1);
auto response = responses.get ().front ();
// Ensure we got response exactly for what we asked for
ASSERT_EQ (response.id, 7);
ASSERT_EQ (response.type, nano::asc_pull_type::blocks);
nano::asc_pull_ack::blocks_payload response_payload;
ASSERT_NO_THROW (response_payload = std::get<nano::asc_pull_ack::blocks_payload> (response.payload));
ASSERT_EQ (response_payload.blocks.size (), 1);
ASSERT_TRUE (response_payload.blocks.front ()->hash () == request_payload.start);
}
TEST (bootstrap_server, serve_end_of_chain)
{
nano::test::system system{};
auto & node = *system.add_node ();
responses_helper responses;
node.bootstrap_server.on_response.add ([&] (auto & response, auto & channel) {
responses.add (response);
});
auto chains = setup_chains (system, node, 1, 128);
auto [account, blocks] = chains.front ();
// Request blocks from account frontier
nano::asc_pull_req request{ node.network_params.network };
request.id = 7;
request.type = nano::asc_pull_type::blocks;
nano::asc_pull_req::blocks_payload request_payload;
request_payload.start = blocks.back ()->hash ();
request_payload.count = nano::bootstrap_server::max_blocks;
request.payload = request_payload;
request.update_header ();
node.network.inbound (request, nano::test::fake_channel (node));
ASSERT_TIMELY (5s, responses.size () == 1);
auto response = responses.get ().front ();
// Ensure we got response exactly for what we asked for
ASSERT_EQ (response.id, 7);
ASSERT_EQ (response.type, nano::asc_pull_type::blocks);
nano::asc_pull_ack::blocks_payload response_payload;
ASSERT_NO_THROW (response_payload = std::get<nano::asc_pull_ack::blocks_payload> (response.payload));
// Response should contain only the last block from chain
ASSERT_EQ (response_payload.blocks.size (), 1);
ASSERT_EQ (*response_payload.blocks.front (), *blocks.back ());
}
TEST (bootstrap_server, serve_missing)
{
nano::test::system system{};
auto & node = *system.add_node ();
responses_helper responses;
node.bootstrap_server.on_response.add ([&] (auto & response, auto & channel) {
responses.add (response);
});
auto chains = setup_chains (system, node, 1, 128);
// Request blocks from account frontier
nano::asc_pull_req request{ node.network_params.network };
request.id = 7;
request.type = nano::asc_pull_type::blocks;
nano::asc_pull_req::blocks_payload request_payload;
request_payload.start = nano::test::random_hash ();
request_payload.count = nano::bootstrap_server::max_blocks;
request.payload = request_payload;
request.update_header ();
node.network.inbound (request, nano::test::fake_channel (node));
ASSERT_TIMELY (5s, responses.size () == 1);
auto response = responses.get ().front ();
// Ensure we got response exactly for what we asked for
ASSERT_EQ (response.id, 7);
ASSERT_EQ (response.type, nano::asc_pull_type::blocks);
nano::asc_pull_ack::blocks_payload response_payload;
ASSERT_NO_THROW (response_payload = std::get<nano::asc_pull_ack::blocks_payload> (response.payload));
// There should be nothing sent
ASSERT_EQ (response_payload.blocks.size (), 0);
}
TEST (bootstrap_server, serve_multiple)
{
nano::test::system system{};
auto & node = *system.add_node ();
responses_helper responses;
node.bootstrap_server.on_response.add ([&] (auto & response, auto & channel) {
responses.add (response);
});
auto chains = setup_chains (system, node, 32, 16);
{
// Request blocks from multiple chains at once
int next_id = 0;
for (auto & [account, blocks] : chains)
{
// Request blocks from account root
nano::asc_pull_req request{ node.network_params.network };
request.id = next_id++;
request.type = nano::asc_pull_type::blocks;
nano::asc_pull_req::blocks_payload request_payload;
request_payload.start = account;
request_payload.count = nano::bootstrap_server::max_blocks;
request.payload = request_payload;
request.update_header ();
node.network.inbound (request, nano::test::fake_channel (node));
}
}
ASSERT_TIMELY (15s, responses.size () == chains.size ());
auto all_responses = responses.get ();
{
int next_id = 0;
for (auto & [account, blocks] : chains)
{
// Find matching response
auto response_it = std::find_if (all_responses.begin (), all_responses.end (), [&] (auto ack) { return ack.id == next_id; });
ASSERT_TRUE (response_it != all_responses.end ());
auto response = *response_it;
// Ensure we got response exactly for what we asked for
ASSERT_EQ (response.id, next_id);
ASSERT_EQ (response.type, nano::asc_pull_type::blocks);
nano::asc_pull_ack::blocks_payload response_payload;
ASSERT_NO_THROW (response_payload = std::get<nano::asc_pull_ack::blocks_payload> (response.payload));
ASSERT_EQ (response_payload.blocks.size (), 17); // 1 open block + 16 random blocks
ASSERT_TRUE (compare_blocks (response_payload.blocks, blocks));
++next_id;
}
}
// Ensure we don't get any unexpected responses
ASSERT_ALWAYS (1s, responses.size () == chains.size ());
}
TEST (bootstrap_server, serve_account_info)
{
nano::test::system system{};
auto & node = *system.add_node ();
responses_helper responses;
node.bootstrap_server.on_response.add ([&] (auto & response, auto & channel) {
responses.add (response);
});
auto chains = setup_chains (system, node, 1, 128);
auto [account, blocks] = chains.front ();
// Request blocks from account root
nano::asc_pull_req request{ node.network_params.network };
request.id = 7;
request.type = nano::asc_pull_type::account_info;
nano::asc_pull_req::account_info_payload request_payload;
request_payload.target = account;
request.payload = request_payload;
request.update_header ();
node.network.inbound (request, nano::test::fake_channel (node));
ASSERT_TIMELY (5s, responses.size () == 1);
auto response = responses.get ().front ();
// Ensure we got response exactly for what we asked for
ASSERT_EQ (response.id, 7);
ASSERT_EQ (response.type, nano::asc_pull_type::account_info);
nano::asc_pull_ack::account_info_payload response_payload;
ASSERT_NO_THROW (response_payload = std::get<nano::asc_pull_ack::account_info_payload> (response.payload));
ASSERT_EQ (response_payload.account, account);
ASSERT_EQ (response_payload.account_open, blocks.front ()->hash ());
ASSERT_EQ (response_payload.account_head, blocks.back ()->hash ());
ASSERT_EQ (response_payload.account_block_count, blocks.size ());
ASSERT_EQ (response_payload.account_conf_frontier, blocks.back ()->hash ());
ASSERT_EQ (response_payload.account_conf_height, blocks.size ());
// Ensure we don't get any unexpected responses
ASSERT_ALWAYS (1s, responses.size () == 1);
}
TEST (bootstrap_server, serve_account_info_missing)
{
nano::test::system system{};
auto & node = *system.add_node ();
responses_helper responses;
node.bootstrap_server.on_response.add ([&] (auto & response, auto & channel) {
responses.add (response);
});
auto chains = setup_chains (system, node, 1, 128);
auto [account, blocks] = chains.front ();
// Request blocks from account root
nano::asc_pull_req request{ node.network_params.network };
request.id = 7;
request.type = nano::asc_pull_type::account_info;
nano::asc_pull_req::account_info_payload request_payload;
request_payload.target = nano::test::random_account ();
request.payload = request_payload;
request.update_header ();
node.network.inbound (request, nano::test::fake_channel (node));
ASSERT_TIMELY (5s, responses.size () == 1);
auto response = responses.get ().front ();
// Ensure we got response exactly for what we asked for
ASSERT_EQ (response.id, 7);
ASSERT_EQ (response.type, nano::asc_pull_type::account_info);
nano::asc_pull_ack::account_info_payload response_payload;
ASSERT_NO_THROW (response_payload = std::get<nano::asc_pull_ack::account_info_payload> (response.payload));
ASSERT_EQ (response_payload.account, request_payload.target);
ASSERT_EQ (response_payload.account_open, 0);
ASSERT_EQ (response_payload.account_head, 0);
ASSERT_EQ (response_payload.account_block_count, 0);
ASSERT_EQ (response_payload.account_conf_frontier, 0);
ASSERT_EQ (response_payload.account_conf_height, 0);
// Ensure we don't get any unexpected responses
ASSERT_ALWAYS (1s, responses.size () == 1);
}

View file

@ -1,12 +1,30 @@
#include <nano/node/common.hpp>
#include <nano/node/network.hpp>
#include <nano/secure/buffer.hpp>
#include <nano/test_common/testutil.hpp>
#include <gtest/gtest.h>
#include <boost/algorithm/string.hpp>
#include <boost/variant/get.hpp>
namespace
{
std::shared_ptr<nano::block> random_block ()
{
nano::block_builder builder;
auto block = builder
.send ()
.previous (nano::test::random_hash ())
.destination (nano::keypair ().pub)
.balance (2)
.sign (nano::keypair ().prv, 4)
.work (5)
.build_shared ();
return block;
}
}
TEST (message, keepalive_serialization)
{
nano::keepalive request1{ nano::dev::network_params.network };
@ -45,15 +63,7 @@ TEST (message, keepalive_deserialize)
TEST (message, publish_serialization)
{
nano::block_builder builder;
auto block = builder
.send ()
.previous (0)
.destination (1)
.balance (2)
.sign (nano::keypair ().prv, 4)
.work (5)
.build_shared ();
auto block = random_block ();
nano::publish publish{ nano::dev::network_params.network, block };
ASSERT_EQ (nano::block_type::send, publish.header.block_type ());
std::vector<uint8_t> bytes;
@ -277,30 +287,181 @@ TEST (message, bulk_pull_serialization)
ASSERT_TRUE (header.bulk_pull_ascending ());
}
TEST (message, keepalive_to_string)
TEST (message, asc_pull_req_serialization_blocks)
{
nano::message_header hdr{ nano::dev::network_params.network, nano::message_type::keepalive };
std::string expected = hdr.to_string ();
nano::asc_pull_req original{ nano::dev::network_params.network };
original.id = 7;
original.type = nano::asc_pull_type::blocks;
nano::keepalive keepalive = nano::keepalive (nano::dev::network_params.network);
ASSERT_EQ (keepalive.to_string (), expected + "\n:::0\n:::0\n:::0\n:::0\n:::0\n:::0\n:::0\n:::0");
nano::asc_pull_req::blocks_payload original_payload;
original_payload.start = nano::test::random_hash ();
original_payload.count = 111;
expected.append ("\n:::0");
original.payload = original_payload;
original.update_header ();
keepalive.peers[1] = nano::endpoint{ boost::asio::ip::make_address_v6 ("::1"), 45 };
expected.append ("\n::1:45");
keepalive.peers[2] = nano::endpoint{ boost::asio::ip::make_address_v6 ("2001:db8:85a3:8d3:1319:8a2e:370:7348"), 0 };
expected.append ("\n2001:db8:85a3:8d3:1319:8a2e:370:7348:0");
keepalive.peers[3] = nano::endpoint{ boost::asio::ip::make_address_v6 ("::"), 65535 };
expected.append ("\n:::65535");
for (int i = 4; i < keepalive.peers.size (); i++)
// Serialize
std::vector<uint8_t> bytes;
{
keepalive.peers[i] = nano::endpoint{ boost::asio::ip::make_address_v6 ("::ffff:1.2.3.4"), 1234 };
expected.append ("\n::ffff:1.2.3.4:1234");
nano::vectorstream stream{ bytes };
original.serialize (stream);
}
nano::bufferstream stream{ bytes.data (), bytes.size () };
// Header
bool error = false;
nano::message_header header (error, stream);
ASSERT_FALSE (error);
ASSERT_EQ (nano::message_type::asc_pull_req, header.type);
// Message
nano::asc_pull_req message (error, stream, header);
ASSERT_FALSE (error);
ASSERT_EQ (original.id, message.id);
ASSERT_EQ (original.type, message.type);
nano::asc_pull_req::blocks_payload message_payload;
ASSERT_NO_THROW (message_payload = std::get<nano::asc_pull_req::blocks_payload> (message.payload));
ASSERT_EQ (original_payload.start, message_payload.start);
ASSERT_EQ (original_payload.count, message_payload.count);
ASSERT_TRUE (nano::at_end (stream));
}
TEST (message, asc_pull_req_serialization_account_info)
{
nano::asc_pull_req original{ nano::dev::network_params.network };
original.id = 7;
original.type = nano::asc_pull_type::account_info;
nano::asc_pull_req::account_info_payload original_payload;
original_payload.target = nano::test::random_hash ();
original.payload = original_payload;
original.update_header ();
// Serialize
std::vector<uint8_t> bytes;
{
nano::vectorstream stream{ bytes };
original.serialize (stream);
}
nano::bufferstream stream{ bytes.data (), bytes.size () };
// Header
bool error = false;
nano::message_header header (error, stream);
ASSERT_FALSE (error);
ASSERT_EQ (nano::message_type::asc_pull_req, header.type);
// Message
nano::asc_pull_req message (error, stream, header);
ASSERT_FALSE (error);
ASSERT_EQ (original.id, message.id);
ASSERT_EQ (original.type, message.type);
nano::asc_pull_req::account_info_payload message_payload;
ASSERT_NO_THROW (message_payload = std::get<nano::asc_pull_req::account_info_payload> (message.payload));
ASSERT_EQ (original_payload.target, message_payload.target);
ASSERT_TRUE (nano::at_end (stream));
}
TEST (message, asc_pull_ack_serialization_blocks)
{
nano::asc_pull_ack original{ nano::dev::network_params.network };
original.id = 11;
original.type = nano::asc_pull_type::blocks;
nano::asc_pull_ack::blocks_payload original_payload;
// Generate blocks
const int num_blocks = 128; // Maximum allowed
for (int n = 0; n < num_blocks; ++n)
{
original_payload.blocks.push_back (random_block ());
}
ASSERT_EQ (keepalive.to_string (), expected);
original.payload = original_payload;
original.update_header ();
// Serialize
std::vector<uint8_t> bytes;
{
nano::vectorstream stream{ bytes };
original.serialize (stream);
}
nano::bufferstream stream{ bytes.data (), bytes.size () };
// Header
bool error = false;
nano::message_header header (error, stream);
ASSERT_FALSE (error);
ASSERT_EQ (nano::message_type::asc_pull_ack, header.type);
// Message
nano::asc_pull_ack message (error, stream, header);
ASSERT_FALSE (error);
ASSERT_EQ (original.id, message.id);
ASSERT_EQ (original.type, message.type);
nano::asc_pull_ack::blocks_payload message_payload;
ASSERT_NO_THROW (message_payload = std::get<nano::asc_pull_ack::blocks_payload> (message.payload));
// Compare blocks
ASSERT_EQ (original_payload.blocks.size (), message_payload.blocks.size ());
ASSERT_TRUE (std::equal (original_payload.blocks.begin (), original_payload.blocks.end (), message_payload.blocks.begin (), message_payload.blocks.end (), [] (auto a, auto b) {
return *a == *b;
}));
ASSERT_TRUE (nano::at_end (stream));
}
TEST (message, asc_pull_ack_serialization_account_info)
{
nano::asc_pull_ack original{ nano::dev::network_params.network };
original.id = 11;
original.type = nano::asc_pull_type::account_info;
nano::asc_pull_ack::account_info_payload original_payload;
original_payload.account = nano::test::random_account ();
original_payload.account_open = nano::test::random_hash ();
original_payload.account_head = nano::test::random_hash ();
original_payload.account_block_count = 932932132;
original_payload.account_conf_frontier = nano::test::random_hash ();
original_payload.account_conf_height = 847312;
original.payload = original_payload;
original.update_header ();
// Serialize
std::vector<uint8_t> bytes;
{
nano::vectorstream stream{ bytes };
original.serialize (stream);
}
nano::bufferstream stream{ bytes.data (), bytes.size () };
// Header
bool error = false;
nano::message_header header (error, stream);
ASSERT_FALSE (error);
ASSERT_EQ (nano::message_type::asc_pull_ack, header.type);
// Message
nano::asc_pull_ack message (error, stream, header);
ASSERT_FALSE (error);
ASSERT_EQ (original.id, message.id);
ASSERT_EQ (original.type, message.type);
nano::asc_pull_ack::account_info_payload message_payload;
ASSERT_NO_THROW (message_payload = std::get<nano::asc_pull_ack::account_info_payload> (message.payload));
ASSERT_EQ (original_payload.account, message_payload.account);
ASSERT_EQ (original_payload.account_open, message_payload.account_open);
ASSERT_EQ (original_payload.account_head, message_payload.account_head);
ASSERT_EQ (original_payload.account_block_count, message_payload.account_block_count);
ASSERT_EQ (original_payload.account_conf_frontier, message_payload.account_conf_frontier);
ASSERT_EQ (original_payload.account_conf_height, message_payload.account_conf_height);
ASSERT_TRUE (nano::at_end (stream));
}

View file

@ -1144,7 +1144,7 @@ TEST (network, bandwidth_limiter)
ASSERT_TIMELY (1s, 1 == node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
// change the bandwidth settings, 2 packets will be dropped
node.network.set_bandwidth_params (1.1, message_size * 2);
node.set_bandwidth_params (message_size * 2, 1.1);
channel1->send (message);
channel2->send (message);
channel1->send (message);
@ -1152,7 +1152,7 @@ TEST (network, bandwidth_limiter)
ASSERT_TIMELY (1s, 3 == node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
// change the bandwidth settings, no packet will be dropped
node.network.set_bandwidth_params (4, message_size);
node.set_bandwidth_params (message_size, 4);
channel1->send (message);
channel2->send (message);
channel1->send (message);

View file

@ -152,6 +152,8 @@ TEST (toml, daemon_config_deserialize_defaults)
ASSERT_EQ (conf.node.backup_before_upgrade, defaults.node.backup_before_upgrade);
ASSERT_EQ (conf.node.bandwidth_limit, defaults.node.bandwidth_limit);
ASSERT_EQ (conf.node.bandwidth_limit_burst_ratio, defaults.node.bandwidth_limit_burst_ratio);
ASSERT_EQ (conf.node.bootstrap_bandwidth_limit, defaults.node.bootstrap_bandwidth_limit);
ASSERT_EQ (conf.node.bootstrap_bandwidth_burst_ratio, defaults.node.bootstrap_bandwidth_burst_ratio);
ASSERT_EQ (conf.node.block_processor_batch_max_time, defaults.node.block_processor_batch_max_time);
ASSERT_EQ (conf.node.bootstrap_connections, defaults.node.bootstrap_connections);
ASSERT_EQ (conf.node.bootstrap_connections_max, defaults.node.bootstrap_connections_max);
@ -392,6 +394,8 @@ TEST (toml, daemon_config_deserialize_no_defaults)
backup_before_upgrade = true
bandwidth_limit = 999
bandwidth_limit_burst_ratio = 999.9
bootstrap_bandwidth_limit = 999
bootstrap_bandwidth_burst_ratio = 999.9
block_processor_batch_max_time = 999
bootstrap_connections = 999
bootstrap_connections_max = 999
@ -557,6 +561,8 @@ TEST (toml, daemon_config_deserialize_no_defaults)
ASSERT_NE (conf.node.backup_before_upgrade, defaults.node.backup_before_upgrade);
ASSERT_NE (conf.node.bandwidth_limit, defaults.node.bandwidth_limit);
ASSERT_NE (conf.node.bandwidth_limit_burst_ratio, defaults.node.bandwidth_limit_burst_ratio);
ASSERT_NE (conf.node.bootstrap_bandwidth_limit, defaults.node.bootstrap_bandwidth_limit);
ASSERT_NE (conf.node.bootstrap_bandwidth_burst_ratio, defaults.node.bootstrap_bandwidth_burst_ratio);
ASSERT_NE (conf.node.block_processor_batch_max_time, defaults.node.block_processor_batch_max_time);
ASSERT_NE (conf.node.bootstrap_connections, defaults.node.bootstrap_connections);
ASSERT_NE (conf.node.bootstrap_connections_max, defaults.node.bootstrap_connections_max);

View file

@ -1362,6 +1362,17 @@ std::shared_ptr<nano::block> nano::deserialize_block_json (boost::property_tree:
return result;
}
void nano::serialize_block_type (nano::stream & stream, const nano::block_type & type)
{
nano::write (stream, type);
}
void nano::serialize_block (nano::stream & stream_a, nano::block const & block_a)
{
nano::serialize_block_type (stream_a, block_a.type ());
block_a.serialize (stream_a);
}
std::shared_ptr<nano::block> nano::deserialize_block (nano::stream & stream_a)
{
nano::block_type type;
@ -1404,6 +1415,11 @@ std::shared_ptr<nano::block> nano::deserialize_block (nano::stream & stream_a, n
result = ::deserialize_block<nano::state_block> (stream_a);
break;
}
case nano::block_type::not_a_block:
{
// Skip null block terminators
return {};
}
default:
#ifndef NANO_FUZZER_TEST
debug_assert (false);

View file

@ -417,6 +417,14 @@ std::unique_ptr<container_info_component> collect_container_info (block_uniquer
std::shared_ptr<nano::block> deserialize_block (nano::stream &);
std::shared_ptr<nano::block> deserialize_block (nano::stream &, nano::block_type, nano::block_uniquer * = nullptr);
std::shared_ptr<nano::block> deserialize_block_json (boost::property_tree::ptree const &, nano::block_uniquer * = nullptr);
/**
* Serialize block type as an 8-bit value
*/
void serialize_block_type (nano::stream &, nano::block_type const &);
/**
* Serialize a block prefixed with an 8-bit typecode
*/
void serialize_block (nano::stream &, nano::block const &);
void block_memory_pool_purge ();
}

View file

@ -69,12 +69,12 @@ public:
/**
* Queues item for batch processing
*/
void add (T const & item)
void add (T && item)
{
nano::unique_lock<nano::mutex> lock{ mutex };
if (queue.size () < max_queue_size)
{
queue.emplace_back (item);
queue.emplace_back (std::forward<T> (item));
lock.unlock ();
condition.notify_one ();
stats.inc (stat_type, nano::stat::detail::queue);
@ -131,7 +131,7 @@ private:
for (int n = 0; n < max_batch_size; ++n)
{
debug_assert (!queue.empty ());
queue_l.emplace_back (queue.front ());
queue_l.emplace_back (std::move (queue.front ()));
queue.pop_front ();
}
return queue_l;

View file

@ -551,6 +551,15 @@ std::string nano::stat::type_to_string (stat::type type)
case nano::stat::type::blockprocessor:
res = "blockprocessor";
break;
case nano::stat::type::bootstrap_server:
res = "bootstrap_server";
break;
case nano::stat::type::bootstrap_server_requests:
res = "bootstrap_server_requests";
break;
case nano::stat::type::bootstrap_server_responses:
res = "bootstrap_server_responses";
break;
}
return res;
}
@ -731,6 +740,12 @@ std::string nano::stat::detail_to_string (stat::detail detail)
case nano::stat::detail::telemetry_ack:
res = "telemetry_ack";
break;
case nano::stat::detail::asc_pull_req:
res = "asc_pull_req";
break;
case nano::stat::detail::asc_pull_ack:
res = "asc_pull_ack";
break;
case nano::stat::detail::state_block:
res = "state_block";
break;
@ -889,6 +904,12 @@ std::string nano::stat::detail_to_string (stat::detail detail)
case nano::stat::detail::invalid_frontier_req_message:
res = "invalid_frontier_req_message";
break;
case nano::stat::detail::invalid_asc_pull_req_message:
res = "invalid_asc_pull_req_message";
break;
case nano::stat::detail::invalid_asc_pull_ack_message:
res = "invalid_asc_pull_ack_message";
break;
case nano::stat::detail::message_too_big:
res = "message_too_big";
break;
@ -988,6 +1009,33 @@ std::string nano::stat::detail_to_string (stat::detail detail)
case nano::stat::detail::missing_block:
res = "missing_block";
break;
case nano::stat::detail::response:
res = "response";
break;
case nano::stat::detail::write_drop:
res = "write_drop";
break;
case nano::stat::detail::write_error:
res = "write_error";
break;
case nano::stat::detail::blocks:
res = "blocks";
break;
case nano::stat::detail::drop:
res = "drop";
break;
case nano::stat::detail::bad_count:
res = "bad_count";
break;
case nano::stat::detail::response_blocks:
res = "response_blocks";
break;
case nano::stat::detail::response_account_info:
res = "response_account_info";
break;
case nano::stat::detail::channel_full:
res = "channel_full";
break;
}
return res;
}

View file

@ -247,6 +247,9 @@ public:
vote_cache,
hinting,
blockprocessor,
bootstrap_server,
bootstrap_server_requests,
bootstrap_server_responses,
};
/** Optional detail type */
@ -304,6 +307,8 @@ public:
node_id_handshake,
telemetry_req,
telemetry_ack,
asc_pull_req,
asc_pull_ack,
// bootstrap, callback
initiate,
@ -369,6 +374,8 @@ public:
invalid_bulk_pull_message,
invalid_bulk_pull_account_message,
invalid_frontier_req_message,
invalid_asc_pull_req_message,
invalid_asc_pull_ack_message,
message_too_big,
outdated_version,
udp_max_per_ip,
@ -435,6 +442,17 @@ public:
hinted,
insert_failed,
missing_block,
// bootstrap server
response,
write_drop,
write_error,
blocks,
drop,
bad_count,
response_blocks,
response_account_info,
channel_full,
};
/** Direction of the stat. If the direction is irrelevant, use in */

View file

@ -2,12 +2,15 @@
#include <nano/lib/utility.hpp>
#include <boost/endian/conversion.hpp>
#include <streambuf>
namespace nano
{
// We operate on streams of uint8_t by convention
using stream = std::basic_streambuf<uint8_t>;
// Read a raw byte stream the size of `T' and fill value. Returns true if there was an error, false otherwise
template <typename T>
bool try_read (nano::stream & stream_a, T & value_a)
@ -16,6 +19,7 @@ bool try_read (nano::stream & stream_a, T & value_a)
auto amount_read (stream_a.sgetn (reinterpret_cast<uint8_t *> (&value_a), sizeof (value_a)));
return amount_read != sizeof (value_a);
}
// A wrapper of try_read which throws if there is an error
template <typename T>
void read (nano::stream & stream_a, T & value)
@ -51,4 +55,28 @@ inline void write (nano::stream & stream_a, std::vector<uint8_t> const & value_a
(void)amount_written;
debug_assert (amount_written == value_a.size ());
}
inline bool at_end (nano::stream & stream)
{
uint8_t junk;
auto end (nano::try_read (stream, junk));
return end;
}
/*
* We use big endian as standard for all network communications
*/
template <typename T>
void write_big_endian (nano::stream & stream, T const & value)
{
nano::write (stream, boost::endian::native_to_big (value));
}
template <typename T>
void read_big_endian (nano::stream & stream, T & value)
{
T tmp;
nano::read (stream, tmp);
value = boost::endian::big_to_native (tmp);
}
}

View file

@ -102,6 +102,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::vote_generator_queue:
thread_role_name_string = "Voting que";
break;
case nano::thread_role::name::bootstrap_server:
thread_role_name_string = "Bootstrp serv";
break;
default:
debug_assert (false && "nano::thread_role::get_string unhandled thread role");
}
@ -322,4 +325,17 @@ unsigned int nano::hardware_concurrency ()
return std::thread::hardware_concurrency ();
}
return value;
}
bool nano::join_or_pass (std::thread & thread)
{
if (thread.joinable ())
{
thread.join ();
return true;
}
else
{
return false;
}
}

View file

@ -46,6 +46,7 @@ namespace thread_role
backlog_population,
election_hinting,
vote_generator_queue,
bootstrap_server,
};
/*
@ -203,8 +204,13 @@ private:
std::unique_ptr<nano::container_info_component> collect_container_info (thread_pool & thread_pool, std::string const & name);
/*
/**
* Number of available logical processor cores. Might be overridden by setting `NANO_HARDWARE_CONCURRENCY` environment variable
*/
unsigned int hardware_concurrency ();
/**
* If thread is joinable joins it, otherwise does nothing
*/
bool join_or_pass (std::thread &);
}

View file

@ -18,6 +18,8 @@ add_library(
active_transactions.cpp
backlog_population.hpp
backlog_population.cpp
bandwidth_limiter.hpp
bandwidth_limiter.cpp
block_arrival.hpp
block_arrival.cpp
blockprocessor.hpp
@ -40,6 +42,8 @@ add_library(
bootstrap/bootstrap_legacy.cpp
bootstrap/bootstrap.hpp
bootstrap/bootstrap.cpp
bootstrap/bootstrap_server.hpp
bootstrap/bootstrap_server.cpp
cli.hpp
cli.cpp
common.hpp

View file

@ -0,0 +1,56 @@
#include <nano/lib/utility.hpp>
#include <nano/node/bandwidth_limiter.hpp>
/*
* bandwidth_limiter
*/
nano::bandwidth_limiter::bandwidth_limiter (std::size_t limit_a, double burst_ratio_a) :
bucket (static_cast<std::size_t> (limit_a * burst_ratio_a), limit_a)
{
}
bool nano::bandwidth_limiter::should_pass (std::size_t message_size_a)
{
return bucket.try_consume (nano::narrow_cast<unsigned int> (message_size_a));
}
void nano::bandwidth_limiter::reset (std::size_t limit_a, double burst_ratio_a)
{
bucket.reset (static_cast<std::size_t> (limit_a * burst_ratio_a), limit_a);
}
/*
* outbound_bandwidth_limiter
*/
nano::outbound_bandwidth_limiter::outbound_bandwidth_limiter (nano::outbound_bandwidth_limiter::config config_a) :
config_m{ config_a },
limiter_standard (config_m.standard_limit, config_m.standard_burst_ratio),
limiter_bootstrap{ config_m.bootstrap_limit, config_m.bootstrap_burst_ratio }
{
}
nano::bandwidth_limiter & nano::outbound_bandwidth_limiter::select_limiter (nano::bandwidth_limit_type type)
{
switch (type)
{
case bandwidth_limit_type::bootstrap:
return limiter_bootstrap;
default:
return limiter_standard;
}
debug_assert (false);
}
bool nano::outbound_bandwidth_limiter::should_pass (std::size_t buffer_size, nano::bandwidth_limit_type type)
{
auto & limiter = select_limiter (type);
return limiter.should_pass (buffer_size);
}
void nano::outbound_bandwidth_limiter::reset (std::size_t limit, double burst_ratio, nano::bandwidth_limit_type type)
{
auto & limiter = select_limiter (type);
limiter.reset (limit, burst_ratio);
}

View file

@ -0,0 +1,73 @@
#pragma once
#include <nano/lib/rate_limiting.hpp>
namespace nano
{
/**
* Enumeration for different bandwidth limits for different traffic types
*/
enum class bandwidth_limit_type
{
/** For all message */
standard,
/** For bootstrap (asc_pull_ack, asc_pull_req) traffic */
bootstrap
};
/**
* Class that tracks and manages bandwidth limits for IO operations
*/
class bandwidth_limiter final
{
public:
// initialize with limit 0 = unbounded
bandwidth_limiter (std::size_t limit, double burst_ratio);
bool should_pass (std::size_t buffer_size);
void reset (std::size_t limit, double burst_ratio);
private:
nano::rate::token_bucket bucket;
};
class outbound_bandwidth_limiter final
{
public: // Config
struct config
{
// standard
std::size_t standard_limit;
double standard_burst_ratio;
// bootstrap
std::size_t bootstrap_limit;
double bootstrap_burst_ratio;
};
public:
explicit outbound_bandwidth_limiter (config);
/**
* Check whether packet falls withing bandwidth limits and should be allowed
* @return true if OK, false if needs to be dropped
*/
bool should_pass (std::size_t buffer_size, bandwidth_limit_type);
/**
* Reset limits of selected limiter type to values passed in arguments
*/
void reset (std::size_t limit, double burst_ratio, bandwidth_limit_type = bandwidth_limit_type::standard);
private:
/**
* Returns reference to limiter corresponding to the limit type
*/
bandwidth_limiter & select_limiter (bandwidth_limit_type);
private:
const config config_m;
private:
bandwidth_limiter limiter_standard;
bandwidth_limiter limiter_bootstrap;
};
}

View file

@ -0,0 +1,291 @@
#include <nano/node/bootstrap/bootstrap_server.hpp>
#include <nano/node/transport/transport.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/secure/store.hpp>
#include <boost/asio/error.hpp>
// TODO: Make threads configurable
nano::bootstrap_server::bootstrap_server (nano::store & store_a, nano::ledger & ledger_a, nano::network_constants const & network_constants_a, nano::stat & stats_a) :
store{ store_a },
ledger{ ledger_a },
network_constants{ network_constants_a },
stats{ stats_a },
request_queue{ stats, nano::stat::type::bootstrap_server, nano::thread_role::name::bootstrap_server, /* threads */ 1, /* max size */ 1024 * 16, /* max batch */ 128 }
{
request_queue.process_batch = [this] (auto & batch) {
process_batch (batch);
};
}
nano::bootstrap_server::~bootstrap_server ()
{
stop ();
}
void nano::bootstrap_server::start ()
{
request_queue.start ();
}
void nano::bootstrap_server::stop ()
{
request_queue.stop ();
}
bool nano::bootstrap_server::verify_request_type (nano::asc_pull_type type) const
{
switch (type)
{
case asc_pull_type::invalid:
return false;
case asc_pull_type::blocks:
case asc_pull_type::account_info:
return true;
}
return false;
}
bool nano::bootstrap_server::verify (const nano::asc_pull_req & message) const
{
if (!verify_request_type (message.type))
{
return false;
}
struct verify_visitor
{
bool operator() (nano::empty_payload const &) const
{
return false;
}
bool operator() (nano::asc_pull_req::blocks_payload const & pld) const
{
return pld.count > 0 && pld.count <= max_blocks;
}
bool operator() (nano::asc_pull_req::account_info_payload const & pld) const
{
return !pld.target.is_zero ();
}
};
return std::visit (verify_visitor{}, message.payload);
}
bool nano::bootstrap_server::request (nano::asc_pull_req const & message, std::shared_ptr<nano::transport::channel> channel)
{
if (!verify (message))
{
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::invalid);
return false;
}
// If channel is full our response will be dropped anyway, so filter that early
// TODO: Add per channel limits (this ideally should be done on the channel message processing side)
if (channel->max ())
{
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::channel_full, nano::stat::dir::in);
return false;
}
request_queue.add (std::make_pair (message, channel));
return true;
}
void nano::bootstrap_server::respond (nano::asc_pull_ack & response, std::shared_ptr<nano::transport::channel> & channel)
{
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::response, nano::stat::dir::out);
// Increase relevant stats depending on payload type
struct stat_visitor
{
nano::stat & stats;
void operator() (nano::empty_payload const &)
{
debug_assert (false, "missing payload");
}
void operator() (nano::asc_pull_ack::blocks_payload const & pld)
{
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::response_blocks, nano::stat::dir::out);
stats.add (nano::stat::type::bootstrap_server, nano::stat::detail::blocks, nano::stat::dir::out, pld.blocks.size ());
}
void operator() (nano::asc_pull_ack::account_info_payload const & pld)
{
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::response_account_info, nano::stat::dir::out);
}
};
std::visit (stat_visitor{ stats }, response.payload);
on_response.notify (response, channel);
channel->send (
response, [this] (auto & ec, auto size) {
if (ec)
{
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::write_error, nano::stat::dir::out);
}
},
nano::buffer_drop_policy::limiter, nano::bandwidth_limit_type::bootstrap);
}
/*
* Requests
*/
void nano::bootstrap_server::process_batch (std::deque<request_t> & batch)
{
auto transaction = store.tx_begin_read ();
for (auto & [request, channel] : batch)
{
if (!channel->max ())
{
auto response = process (transaction, request);
respond (response, channel);
}
else
{
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::channel_full, nano::stat::dir::out);
}
}
}
nano::asc_pull_ack nano::bootstrap_server::process (nano::transaction const & transaction, const nano::asc_pull_req & message)
{
return std::visit ([this, &transaction, &message] (auto && request) { return process (transaction, message.id, request); }, message.payload);
}
nano::asc_pull_ack nano::bootstrap_server::process (const nano::transaction &, nano::asc_pull_req::id_t id, const nano::empty_payload & request)
{
// Empty payload should never be possible, but return empty response anyway
debug_assert (false, "missing payload");
nano::asc_pull_ack response{ network_constants };
response.id = id;
response.type = nano::asc_pull_type::invalid;
return response;
}
/*
* Blocks response
*/
nano::asc_pull_ack nano::bootstrap_server::process (nano::transaction const & transaction, nano::asc_pull_req::id_t id, nano::asc_pull_req::blocks_payload const & request)
{
const std::size_t count = std::min (static_cast<std::size_t> (request.count), max_blocks);
// `start` can represent either account or block hash
if (store.block.exists (transaction, request.start.as_block_hash ()))
{
return prepare_response (transaction, id, request.start.as_block_hash (), count);
}
if (store.account.exists (transaction, request.start.as_account ()))
{
auto info = store.account.get (transaction, request.start.as_account ());
if (info)
{
// Start from open block if pulling by account
return prepare_response (transaction, id, info->open_block, count);
}
else
{
debug_assert (false, "account exists but cannot be retrieved");
}
}
// Neither block nor account found, send empty response to indicate that
return prepare_empty_blocks_response (id);
}
nano::asc_pull_ack nano::bootstrap_server::prepare_response (nano::transaction const & transaction, nano::asc_pull_req::id_t id, nano::block_hash start_block, std::size_t count)
{
debug_assert (count <= max_blocks);
auto blocks = prepare_blocks (transaction, start_block, count);
debug_assert (blocks.size () <= count);
nano::asc_pull_ack response{ network_constants };
response.id = id;
response.type = nano::asc_pull_type::blocks;
nano::asc_pull_ack::blocks_payload response_payload;
response_payload.blocks = blocks;
response.payload = response_payload;
response.update_header ();
return response;
}
nano::asc_pull_ack nano::bootstrap_server::prepare_empty_blocks_response (nano::asc_pull_req::id_t id)
{
nano::asc_pull_ack response{ network_constants };
response.id = id;
response.type = nano::asc_pull_type::blocks;
nano::asc_pull_ack::blocks_payload empty_payload{};
response.payload = empty_payload;
response.update_header ();
return response;
}
std::vector<std::shared_ptr<nano::block>> nano::bootstrap_server::prepare_blocks (nano::transaction const & transaction, nano::block_hash start_block, std::size_t count) const
{
debug_assert (count <= max_blocks);
std::vector<std::shared_ptr<nano::block>> result;
if (!start_block.is_zero ())
{
std::shared_ptr<nano::block> current = store.block.get (transaction, start_block);
while (current && result.size () < count)
{
result.push_back (current);
auto successor = current->sideband ().successor;
current = store.block.get (transaction, successor);
}
}
return result;
}
/*
* Account info response
*/
nano::asc_pull_ack nano::bootstrap_server::process (const nano::transaction & transaction, nano::asc_pull_req::id_t id, const nano::asc_pull_req::account_info_payload & request)
{
nano::asc_pull_ack response{ network_constants };
response.id = id;
response.type = nano::asc_pull_type::account_info;
auto target = request.target.as_account ();
// Try to lookup account assuming target is block hash
if (auto account_from_hash = ledger.account_safe (transaction, request.target.as_block_hash ()); !account_from_hash.is_zero ())
{
target = account_from_hash;
}
// Otherwise assume target is an actual account
nano::asc_pull_ack::account_info_payload response_payload{};
response_payload.account = target;
auto account_info = store.account.get (transaction, target);
if (account_info)
{
response_payload.account_open = account_info->open_block;
response_payload.account_head = account_info->head;
response_payload.account_block_count = account_info->block_count;
auto conf_info = store.confirmation_height.get (transaction, target);
if (conf_info)
{
response_payload.account_conf_frontier = conf_info->frontier;
response_payload.account_conf_height = conf_info->height;
}
}
// If account is missing the response payload will contain all 0 fields, except for the target
response.payload = response_payload;
response.update_header ();
return response;
}

View file

@ -0,0 +1,87 @@
#pragma once
#include <nano/lib/observer_set.hpp>
#include <nano/lib/processing_queue.hpp>
#include <nano/node/messages.hpp>
#include <memory>
#include <queue>
#include <utility>
namespace nano
{
class ledger;
namespace transport
{
class channel;
}
/**
* Processes bootstrap requests (`asc_pull_req` messages) and replies with bootstrap responses (`asc_pull_ack`)
*
* In order to ensure maximum throughput, there are two internal processing queues:
* - One for doing ledger lookups and preparing responses (`request_queue`)
* - One for sending back those responses over the network (`response_queue`)
*/
class bootstrap_server final
{
public:
// `asc_pull_req` message is small, store by value
using request_t = std::pair<nano::asc_pull_req, std::shared_ptr<nano::transport::channel>>; // <request, response channel>
public:
bootstrap_server (nano::store &, nano::ledger &, nano::network_constants const &, nano::stat &);
~bootstrap_server ();
void start ();
void stop ();
/**
* Process `asc_pull_req` message coming from network.
* Reply will be sent back over passed in `channel`
*/
bool request (nano::asc_pull_req const & message, std::shared_ptr<nano::transport::channel> channel);
public: // Events
nano::observer_set<nano::asc_pull_ack &, std::shared_ptr<nano::transport::channel> &> on_response;
private:
void process_batch (std::deque<request_t> & batch);
nano::asc_pull_ack process (nano::transaction const &, nano::asc_pull_req const & message);
void respond (nano::asc_pull_ack &, std::shared_ptr<nano::transport::channel> &);
nano::asc_pull_ack process (nano::transaction const &, nano::asc_pull_req::id_t id, nano::empty_payload const & request);
/*
* Blocks response
*/
nano::asc_pull_ack process (nano::transaction const &, nano::asc_pull_req::id_t id, nano::asc_pull_req::blocks_payload const & request);
nano::asc_pull_ack prepare_response (nano::transaction const &, nano::asc_pull_req::id_t id, nano::block_hash start_block, std::size_t count);
nano::asc_pull_ack prepare_empty_blocks_response (nano::asc_pull_req::id_t id);
std::vector<std::shared_ptr<nano::block>> prepare_blocks (nano::transaction const &, nano::block_hash start_block, std::size_t count) const;
/*
* Account info response
*/
nano::asc_pull_ack process (nano::transaction const &, nano::asc_pull_req::id_t id, nano::asc_pull_req::account_info_payload const & request);
/*
* Checks if the request should be dropped early on
*/
bool verify (nano::asc_pull_req const & message) const;
bool verify_request_type (nano::asc_pull_type) const;
private: // Dependencies
nano::store & store;
nano::ledger & ledger;
nano::network_constants const & network_constants;
nano::stat & stats;
private:
processing_queue<request_t> request_queue;
public: // Config
/** Maximum number of blocks to send in a single response, cannot be higher than capacity of a single `asc_pull_ack` message */
constexpr static std::size_t max_blocks = nano::asc_pull_ack::blocks_payload::max_blocks;
};
}

View file

@ -72,9 +72,9 @@ bool nano::message_header::deserialize (nano::stream & stream_a)
return error;
}
std::string nano::to_string (nano::message_type message_type_l)
std::string nano::to_string (nano::message_type type)
{
switch (message_type_l)
switch (type)
{
case nano::message_type::invalid:
return "invalid";
@ -102,15 +102,19 @@ std::string nano::to_string (nano::message_type message_type_l)
return "telemetry_req";
case nano::message_type::telemetry_ack:
return "telemetry_ack";
case nano::message_type::asc_pull_req:
return "asc_pull_req";
case nano::message_type::asc_pull_ack:
return "asc_pull_ack";
// default case intentionally omitted to cause warnings for unhandled enums
}
return "n/a";
}
nano::stat::detail nano::to_stat_detail (nano::message_type message_type)
nano::stat::detail nano::to_stat_detail (nano::message_type type)
{
switch (message_type)
switch (type)
{
case nano::message_type::invalid:
return nano::stat::detail::invalid;
@ -138,6 +142,10 @@ nano::stat::detail nano::to_stat_detail (nano::message_type message_type)
return nano::stat::detail::telemetry_req;
case nano::message_type::telemetry_ack:
return nano::stat::detail::telemetry_ack;
case nano::message_type::asc_pull_req:
return nano::stat::detail::asc_pull_req;
case nano::message_type::asc_pull_ack:
return nano::stat::detail::asc_pull_ack;
// default case intentionally omitted to cause warnings for unhandled enums
}
debug_assert (false);
@ -304,6 +312,14 @@ std::size_t nano::message_header::payload_length_bytes () const
{
return nano::telemetry_ack::size (*this);
}
case nano::message_type::asc_pull_req:
{
return nano::asc_pull_req::size (*this);
}
case nano::message_type::asc_pull_ack:
{
return nano::asc_pull_ack::size (*this);
}
default:
{
debug_assert (false);
@ -327,6 +343,8 @@ bool nano::message_header::is_valid_message_type () const
case nano::message_type::confirm_req:
case nano::message_type::node_id_handshake:
case nano::message_type::telemetry_ack:
case nano::message_type::asc_pull_req:
case nano::message_type::asc_pull_ack:
{
return true;
}
@ -364,6 +382,11 @@ nano::shared_const_buffer nano::message::to_shared_const_buffer () const
return shared_const_buffer (to_bytes ());
}
nano::message_type nano::message::type () const
{
return header.type;
}
/*
* message_parser
*/
@ -1596,4 +1619,321 @@ std::size_t nano::node_id_handshake::size (nano::message_header const & header_a
result += sizeof (nano::account) + sizeof (nano::signature);
}
return result;
}
/*
* asc_pull_req
*/
nano::asc_pull_req::asc_pull_req (const nano::network_constants & constants) :
message (constants, nano::message_type::asc_pull_req)
{
}
nano::asc_pull_req::asc_pull_req (bool & error, nano::stream & stream, const nano::message_header & header) :
message (header)
{
error = deserialize (stream);
}
void nano::asc_pull_req::visit (nano::message_visitor & visitor) const
{
visitor.asc_pull_req (*this);
}
void nano::asc_pull_req::serialize (nano::stream & stream) const
{
header.serialize (stream);
nano::write (stream, type);
nano::write_big_endian (stream, id);
serialize_payload (stream);
}
bool nano::asc_pull_req::deserialize (nano::stream & stream)
{
debug_assert (header.type == nano::message_type::asc_pull_req);
bool error = false;
try
{
nano::read (stream, type);
nano::read_big_endian (stream, id);
deserialize_payload (stream);
}
catch (std::runtime_error const &)
{
error = true;
}
return error;
}
void nano::asc_pull_req::serialize_payload (nano::stream & stream) const
{
debug_assert (verify_consistency ());
std::visit ([&stream] (auto && pld) { pld.serialize (stream); }, payload);
}
void nano::asc_pull_req::deserialize_payload (nano::stream & stream)
{
switch (type)
{
case asc_pull_type::blocks:
{
blocks_payload pld;
pld.deserialize (stream);
payload = pld;
break;
}
case asc_pull_type::account_info:
{
account_info_payload pld;
pld.deserialize (stream);
payload = pld;
break;
}
default:
throw std::runtime_error ("Unknown asc_pull_type");
}
}
void nano::asc_pull_req::update_header ()
{
std::vector<uint8_t> bytes;
{
nano::vectorstream payload_stream (bytes);
serialize_payload (payload_stream);
}
debug_assert (bytes.size () <= 65535u); // Max int16 for storing size
debug_assert (bytes.size () >= 1);
header.extensions = std::bitset<16> (bytes.size ());
}
std::size_t nano::asc_pull_req::size (const nano::message_header & header)
{
uint16_t payload_length = nano::narrow_cast<uint16_t> (header.extensions.to_ulong ());
return partial_size + payload_length;
}
bool nano::asc_pull_req::verify_consistency () const
{
struct consistency_visitor
{
nano::asc_pull_type type;
void operator() (empty_payload) const
{
debug_assert (false, "missing payload");
}
void operator() (blocks_payload) const
{
debug_assert (type == asc_pull_type::blocks);
}
void operator() (account_info_payload) const
{
debug_assert (type == asc_pull_type::account_info);
}
};
std::visit (consistency_visitor{ type }, payload);
return true; // Just for convenience of calling from asserts
}
/*
* asc_pull_req::blocks_payload
*/
void nano::asc_pull_req::blocks_payload::serialize (nano::stream & stream) const
{
nano::write (stream, start);
nano::write (stream, count);
}
void nano::asc_pull_req::blocks_payload::deserialize (nano::stream & stream)
{
nano::read (stream, start);
nano::read (stream, count);
}
/*
* asc_pull_req::account_info_payload
*/
void nano::asc_pull_req::account_info_payload::serialize (stream & stream) const
{
nano::write (stream, target);
}
void nano::asc_pull_req::account_info_payload::deserialize (stream & stream)
{
nano::read (stream, target);
}
/*
* asc_pull_ack
*/
nano::asc_pull_ack::asc_pull_ack (const nano::network_constants & constants) :
message (constants, nano::message_type::asc_pull_ack)
{
}
nano::asc_pull_ack::asc_pull_ack (bool & error, nano::stream & stream, const nano::message_header & header) :
message (header)
{
error = deserialize (stream);
}
void nano::asc_pull_ack::visit (nano::message_visitor & visitor) const
{
visitor.asc_pull_ack (*this);
}
void nano::asc_pull_ack::serialize (nano::stream & stream) const
{
debug_assert (header.extensions.to_ulong () > 0); // Block payload must have least `not_a_block` terminator
header.serialize (stream);
nano::write (stream, type);
nano::write_big_endian (stream, id);
serialize_payload (stream);
}
bool nano::asc_pull_ack::deserialize (nano::stream & stream)
{
debug_assert (header.type == nano::message_type::asc_pull_ack);
bool error = false;
try
{
nano::read (stream, type);
nano::read_big_endian (stream, id);
deserialize_payload (stream);
}
catch (std::runtime_error const &)
{
error = true;
}
return error;
}
void nano::asc_pull_ack::serialize_payload (nano::stream & stream) const
{
debug_assert (verify_consistency ());
std::visit ([&stream] (auto && pld) { pld.serialize (stream); }, payload);
}
void nano::asc_pull_ack::deserialize_payload (nano::stream & stream)
{
switch (type)
{
case asc_pull_type::blocks:
{
blocks_payload pld;
pld.deserialize (stream);
payload = pld;
break;
}
case asc_pull_type::account_info:
{
account_info_payload pld;
pld.deserialize (stream);
payload = pld;
break;
}
default:
throw std::runtime_error ("Unknown asc_pull_type");
}
}
void nano::asc_pull_ack::update_header ()
{
std::vector<uint8_t> bytes;
{
nano::vectorstream payload_stream (bytes);
serialize_payload (payload_stream);
}
debug_assert (bytes.size () <= 65535u); // Max int16 for storing size
debug_assert (bytes.size () >= 1);
header.extensions = std::bitset<16> (bytes.size ());
}
std::size_t nano::asc_pull_ack::size (const nano::message_header & header)
{
uint16_t payload_length = nano::narrow_cast<uint16_t> (header.extensions.to_ulong ());
return partial_size + payload_length;
}
bool nano::asc_pull_ack::verify_consistency () const
{
struct consistency_visitor
{
nano::asc_pull_type type;
void operator() (empty_payload) const
{
debug_assert (false, "missing payload");
}
void operator() (blocks_payload) const
{
debug_assert (type == asc_pull_type::blocks);
}
void operator() (account_info_payload) const
{
debug_assert (type == asc_pull_type::account_info);
}
};
std::visit (consistency_visitor{ type }, payload);
return true; // Just for convenience of calling from asserts
}
/*
* asc_pull_ack::blocks_payload
*/
void nano::asc_pull_ack::blocks_payload::serialize (nano::stream & stream) const
{
debug_assert (blocks.size () <= max_blocks);
for (auto & block : blocks)
{
debug_assert (block != nullptr);
nano::serialize_block (stream, *block);
}
// For convenience, end with null block terminator
nano::serialize_block_type (stream, nano::block_type::not_a_block);
}
void nano::asc_pull_ack::blocks_payload::deserialize (nano::stream & stream)
{
auto current = nano::deserialize_block (stream);
while (current && blocks.size () < max_blocks)
{
blocks.push_back (current);
current = nano::deserialize_block (stream);
}
}
/*
* asc_pull_ack::account_info_payload
*/
void nano::asc_pull_ack::account_info_payload::serialize (nano::stream & stream) const
{
nano::write (stream, account);
nano::write (stream, account_open);
nano::write (stream, account_head);
nano::write_big_endian (stream, account_block_count);
nano::write (stream, account_conf_frontier);
nano::write_big_endian (stream, account_conf_height);
}
void nano::asc_pull_ack::account_info_payload::deserialize (nano::stream & stream)
{
nano::read (stream, account);
nano::read (stream, account_open);
nano::read (stream, account_head);
nano::read_big_endian (stream, account_block_count);
nano::read (stream, account_conf_frontier);
nano::read_big_endian (stream, account_conf_height);
}

View file

@ -12,6 +12,7 @@
#include <nano/secure/network_filter.hpp>
#include <bitset>
#include <variant>
namespace nano
{
@ -34,7 +35,9 @@ enum class message_type : uint8_t
node_id_handshake = 0x0a,
bulk_pull_account = 0x0b,
telemetry_req = 0x0c,
telemetry_ack = 0x0d
telemetry_ack = 0x0d,
asc_pull_req = 0x0e,
asc_pull_ack = 0x0f,
};
std::string to_string (message_type);
@ -98,11 +101,15 @@ public:
explicit message (nano::network_constants const &, nano::message_type);
explicit message (nano::message_header const &);
virtual ~message () = default;
virtual void serialize (nano::stream &) const = 0;
virtual void visit (nano::message_visitor &) const = 0;
std::shared_ptr<std::vector<uint8_t>> to_bytes () const;
nano::shared_const_buffer to_shared_const_buffer () const;
nano::message_type type () const;
public:
nano::message_header header;
};
@ -349,6 +356,173 @@ public:
static std::size_t size (nano::message_header const &);
};
/**
* Type of requested asc pull data
* - blocks:
* - account_info:
*/
enum class asc_pull_type : uint8_t
{
invalid = 0x0,
blocks = 0x1,
account_info = 0x2,
};
class empty_payload
{
public:
void serialize (nano::stream &) const
{
debug_assert (false);
}
void deserialize (nano::stream &)
{
debug_assert (false);
}
};
/**
* Ascending bootstrap pull request
*/
class asc_pull_req final : public message
{
public:
using id_t = uint64_t;
explicit asc_pull_req (nano::network_constants const &);
asc_pull_req (bool & error, nano::stream &, nano::message_header const &);
void serialize (nano::stream &) const override;
bool deserialize (nano::stream &);
void visit (nano::message_visitor &) const override;
static std::size_t size (nano::message_header const &);
/**
* Update payload size stored in header
* IMPORTANT: Must be called after any update to the payload
*/
void update_header ();
void serialize_payload (nano::stream &) const;
void deserialize_payload (nano::stream &);
private: // Debug
/**
* Asserts that payload type is consistent with actual payload
*/
bool verify_consistency () const;
public: // Payload definitions
class blocks_payload
{
public:
void serialize (nano::stream &) const;
void deserialize (nano::stream &);
public:
nano::hash_or_account start{ 0 };
uint8_t count{ 0 };
};
class account_info_payload
{
public:
void serialize (nano::stream &) const;
void deserialize (nano::stream &);
public:
nano::hash_or_account target{ 0 };
};
public: // Payload
/** Currently unused, allows extensions in the future */
asc_pull_type type{ asc_pull_type::invalid };
id_t id{ 0 };
/** Payload depends on `asc_pull_type` */
std::variant<empty_payload, blocks_payload, account_info_payload> payload;
public:
/** Size of message without payload */
constexpr static std::size_t partial_size = sizeof (type) + sizeof (id);
};
/**
* Ascending bootstrap pull response
*/
class asc_pull_ack final : public message
{
public:
using id_t = asc_pull_req::id_t;
explicit asc_pull_ack (nano::network_constants const &);
asc_pull_ack (bool & error, nano::stream &, nano::message_header const &);
void serialize (nano::stream &) const override;
bool deserialize (nano::stream &);
void visit (nano::message_visitor &) const override;
static std::size_t size (nano::message_header const &);
/**
* Update payload size stored in header
* IMPORTANT: Must be called after any update to the payload
*/
void update_header ();
void serialize_payload (nano::stream &) const;
void deserialize_payload (nano::stream &);
private: // Debug
/**
* Asserts that payload type is consistent with actual payload
*/
bool verify_consistency () const;
public: // Payload definitions
class blocks_payload
{
public:
void serialize (nano::stream &) const;
void deserialize (nano::stream &);
public:
std::vector<std::shared_ptr<nano::block>> blocks{};
public:
/* Header allows for 16 bit extensions; 65535 bytes / 500 bytes (block size with some future margin) ~ 131 */
constexpr static std::size_t max_blocks = 128;
};
class account_info_payload
{
public:
void serialize (nano::stream &) const;
void deserialize (nano::stream &);
public:
nano::account account{ 0 };
nano::block_hash account_open{ 0 };
nano::block_hash account_head{ 0 };
uint64_t account_block_count{ 0 };
nano::block_hash account_conf_frontier{ 0 };
uint64_t account_conf_height{ 0 };
};
public: // Payload
/** Currently unused, allows extensions in the future */
asc_pull_type type{ asc_pull_type::invalid };
id_t id{ 0 };
/** Payload depends on `asc_pull_type` */
std::variant<empty_payload, blocks_payload, account_info_payload> payload;
public:
/** Size of message without payload */
constexpr static std::size_t partial_size = sizeof (type) + sizeof (id);
};
class message_visitor
{
public:
@ -398,6 +572,14 @@ public:
{
default_handler (message);
}
virtual void asc_pull_req (nano::asc_pull_req const & message)
{
default_handler (message);
}
virtual void asc_pull_ack (nano::asc_pull_ack const & message)
{
default_handler (message);
}
virtual void default_handler (nano::message const &){};
};
}

View file

@ -21,7 +21,6 @@ nano::network::network (nano::node & node_a, uint16_t port_a) :
} },
buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer
resolver (node_a.io_ctx),
limiter (node_a.config.bandwidth_limit_burst_ratio, node_a.config.bandwidth_limit),
tcp_message_manager (node_a.config.tcp_incoming_connections_max),
node (node_a),
publish_filter (256 * 1024),
@ -410,13 +409,14 @@ public:
channel (channel_a)
{
}
void keepalive (nano::keepalive const & message_a) override
{
if (node.config.logging.network_keepalive_logging ())
{
node.logger.try_log (boost::str (boost::format ("Received keepalive message from %1%") % channel->to_string ()));
}
node.stats.inc (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in);
node.network.merge_peers (message_a.peers);
// Check for special node port data
@ -430,13 +430,14 @@ public:
channel->set_peering_endpoint (new_endpoint);
}
}
void publish (nano::publish const & message_a) override
{
if (node.config.logging.network_message_logging ())
{
node.logger.try_log (boost::str (boost::format ("Publish message from %1% for %2%") % channel->to_string () % message_a.block->hash ().to_string ()));
}
node.stats.inc (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in);
if (!node.block_processor.full ())
{
node.process_active (message_a.block);
@ -447,6 +448,7 @@ public:
node.stats.inc (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::in);
}
}
void confirm_req (nano::confirm_req const & message_a) override
{
if (node.config.logging.network_message_logging ())
@ -460,7 +462,7 @@ public:
node.logger.try_log (boost::str (boost::format ("Confirm_req message from %1% for %2%") % channel->to_string () % message_a.block->hash ().to_string ()));
}
}
node.stats.inc (nano::stat::type::message, nano::stat::detail::confirm_req, nano::stat::dir::in);
// Don't load nodes with disabled voting
if (node.config.enable_voting && node.wallets.reps ().voting > 0)
{
@ -474,45 +476,51 @@ public:
}
}
}
void confirm_ack (nano::confirm_ack const & message_a) override
{
if (node.config.logging.network_message_logging ())
{
node.logger.try_log (boost::str (boost::format ("Received confirm_ack message from %1% for %2% timestamp %3%") % channel->to_string () % message_a.vote->hashes_string () % std::to_string (message_a.vote->timestamp ())));
}
node.stats.inc (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::in);
if (!message_a.vote->account.is_zero ())
{
node.vote_processor.vote (message_a.vote, channel);
}
}
void bulk_pull (nano::bulk_pull const &) override
{
debug_assert (false);
}
void bulk_pull_account (nano::bulk_pull_account const &) override
{
debug_assert (false);
}
void bulk_push (nano::bulk_push const &) override
{
debug_assert (false);
}
void frontier_req (nano::frontier_req const &) override
{
debug_assert (false);
}
void node_id_handshake (nano::node_id_handshake const & message_a) override
{
node.stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in);
}
void telemetry_req (nano::telemetry_req const & message_a) override
{
if (node.config.logging.network_telemetry_logging ())
{
node.logger.try_log (boost::str (boost::format ("Telemetry_req message from %1%") % channel->to_string ()));
}
node.stats.inc (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in);
// Send an empty telemetry_ack if we do not want, just to acknowledge that we have received the message to
// remove any timeouts on the server side waiting for a message.
@ -524,27 +532,42 @@ public:
}
channel->send (telemetry_ack, nullptr, nano::buffer_drop_policy::no_socket_drop);
}
void telemetry_ack (nano::telemetry_ack const & message_a) override
{
if (node.config.logging.network_telemetry_logging ())
{
node.logger.try_log (boost::str (boost::format ("Received telemetry_ack message from %1%") % channel->to_string ()));
}
node.stats.inc (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in);
if (node.telemetry)
{
node.telemetry->set (message_a, *channel);
}
}
void asc_pull_req (nano::asc_pull_req const & message) override
{
node.bootstrap_server.request (message, channel);
}
void asc_pull_ack (nano::asc_pull_ack const & message) override
{
// TODO: Process in ascending bootstrap client
}
private:
nano::node & node;
std::shared_ptr<nano::transport::channel> channel;
};
}
void nano::network::process_message (nano::message const & message_a, std::shared_ptr<nano::transport::channel> const & channel_a)
void nano::network::process_message (nano::message const & message, std::shared_ptr<nano::transport::channel> const & channel)
{
network_message_visitor visitor (node, channel_a);
message_a.visit (visitor);
node.stats.inc (nano::stat::type::message, nano::to_stat_detail (message.header.type), nano::stat::dir::in);
network_message_visitor visitor (node, channel);
message.visit (visitor);
}
// Send keepalives to all the peers we've been notified of
@ -727,7 +750,7 @@ std::shared_ptr<nano::transport::channel> nano::network::find_node_id (nano::acc
return result;
}
nano::endpoint nano::network::endpoint ()
nano::endpoint nano::network::endpoint () const
{
return nano::endpoint (boost::asio::ip::address_v6::loopback (), port);
}
@ -808,11 +831,6 @@ void nano::network::erase (nano::transport::channel const & channel_a)
}
}
void nano::network::set_bandwidth_params (double limit_burst_ratio_a, std::size_t limit_a)
{
limiter.reset (limit_burst_ratio_a, limit_a);
}
nano::message_buffer_manager::message_buffer_manager (nano::stat & stats_a, std::size_t size, std::size_t count) :
stats (stats_a),
free (count),

View file

@ -11,6 +11,7 @@
#include <memory>
#include <queue>
#include <unordered_set>
namespace nano
{
class channel;
@ -115,11 +116,13 @@ private:
std::unordered_map<boost::asio::ip::address, unsigned> cookies_per_ip;
std::size_t max_cookies_per_ip;
};
class network final
{
public:
network (nano::node &, uint16_t);
~network ();
nano::networks id;
void start ();
void stop ();
@ -148,7 +151,7 @@ public:
bool not_a_peer (nano::endpoint const &, bool);
// Should we reach out to this endpoint with a keepalive message
bool reachout (nano::endpoint const &, bool = false);
std::deque<std::shared_ptr<nano::transport::channel>> list (std::size_t, uint8_t = 0, bool = true);
std::deque<std::shared_ptr<nano::transport::channel>> list (std::size_t max_count = 0, uint8_t = 0, bool = true);
std::deque<std::shared_ptr<nano::transport::channel>> list_non_pr (std::size_t);
// Desired fanout for a given scale
std::size_t fanout (float scale = 1.0f) const;
@ -158,7 +161,7 @@ public:
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (std::size_t, uint8_t = 0, bool = false) const;
// Get the next peer for attempting a tcp bootstrap connection
nano::tcp_endpoint bootstrap_peer (bool = false);
nano::endpoint endpoint ();
nano::endpoint endpoint () const;
void cleanup (std::chrono::steady_clock::time_point const &);
void ongoing_cleanup ();
// Node ID cookies cleanup
@ -169,7 +172,6 @@ public:
float size_sqrt () const;
bool empty () const;
void erase (nano::transport::channel const &);
void set_bandwidth_params (double, std::size_t);
static std::string to_string (nano::networks);
private:
@ -180,7 +182,6 @@ public:
nano::message_buffer_manager buffer_container;
boost::asio::ip::udp::resolver resolver;
std::vector<boost::thread> packet_processing_threads;
nano::bandwidth_limiter limiter;
nano::peer_exclusion excluded_peers;
nano::tcp_message_manager tcp_message_manager;
nano::node & node;

View file

@ -31,6 +31,10 @@ extern unsigned char nano_bootstrap_weights_beta[];
extern std::size_t nano_bootstrap_weights_beta_size;
}
/*
* Configs
*/
nano::backlog_population::config nano::nodeconfig_to_backlog_population_config (const nano::node_config & config)
{
nano::backlog_population::config cfg;
@ -53,6 +57,20 @@ nano::hinted_scheduler::config nano::nodeconfig_to_hinted_scheduler_config (cons
return cfg;
}
nano::outbound_bandwidth_limiter::config nano::outbound_bandwidth_limiter_config (const nano::node_config & config)
{
outbound_bandwidth_limiter::config cfg;
cfg.standard_limit = config.bandwidth_limit;
cfg.standard_burst_ratio = config.bandwidth_limit_burst_ratio;
cfg.bootstrap_limit = config.bootstrap_bandwidth_limit;
cfg.bootstrap_burst_ratio = config.bootstrap_bandwidth_burst_ratio;
return cfg;
}
/*
* node
*/
void nano::node::keepalive (std::string const & address_a, uint16_t port_a)
{
auto node_l (shared_from_this ());
@ -148,12 +166,14 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
gap_cache (*this),
ledger (store, stats, network_params.ledger, flags_a.generate_cache),
checker (config.signature_checker_threads),
outbound_limiter{ outbound_bandwidth_limiter_config (config) },
// empty `config.peering_port` means the user made no port choice at all;
// otherwise, any value is considered, with `0` having the special meaning of 'let the OS pick a port instead'
//
network (*this, config.peering_port.has_value () ? *config.peering_port : 0),
telemetry (std::make_shared<nano::telemetry> (network, workers, observers.telemetry, stats, network_params, flags.disable_ongoing_telemetry_requests)),
bootstrap_initiator (*this),
bootstrap_server{ store, ledger, network_params.network, stats },
// BEWARE: `bootstrap` takes `network.port` instead of `config.peering_port` because when the user doesn't specify
// a peering port and wants the OS to pick one, the picking happens when `network` gets initialized
// (if UDP is active, otherwise it happens when `bootstrap` gets initialized), so then for TCP traffic
@ -749,6 +769,7 @@ void nano::node::start ()
final_generator.start ();
backlog.start ();
hinting.start ();
bootstrap_server.start ();
}
void nano::node::stop ()
@ -775,6 +796,7 @@ void nano::node::stop ()
{
websocket_server->stop ();
}
bootstrap_server.stop ();
bootstrap_initiator.stop ();
tcp_listener.stop ();
port_mapping.stop ();
@ -1500,7 +1522,7 @@ void nano::node::set_bandwidth_params (std::size_t limit, double ratio)
{
config.bandwidth_limit_burst_ratio = ratio;
config.bandwidth_limit = limit;
network.set_bandwidth_params (limit, ratio);
outbound_limiter.reset (limit, ratio);
logger.always_log (boost::str (boost::format ("set_bandwidth_params(%1%, %2%)") % limit % ratio));
}

View file

@ -5,10 +5,12 @@
#include <nano/lib/work.hpp>
#include <nano/node/active_transactions.hpp>
#include <nano/node/backlog_population.hpp>
#include <nano/node/bandwidth_limiter.hpp>
#include <nano/node/block_arrival.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/bootstrap/bootstrap.hpp>
#include <nano/node/bootstrap/bootstrap_attempt.hpp>
#include <nano/node/bootstrap/bootstrap_server.hpp>
#include <nano/node/confirmation_height_processor.hpp>
#include <nano/node/distributed_work_factory.hpp>
#include <nano/node/election.hpp>
@ -59,6 +61,7 @@ std::unique_ptr<container_info_component> collect_container_info (rep_crawler &
backlog_population::config nodeconfig_to_backlog_population_config (node_config const &);
vote_cache::config nodeconfig_to_vote_cache_config (node_config const &, node_flags const &);
hinted_scheduler::config nodeconfig_to_hinted_scheduler_config (node_config const &);
outbound_bandwidth_limiter::config outbound_bandwidth_limiter_config (node_config const &);
class node final : public std::enable_shared_from_this<nano::node>
{
@ -154,9 +157,11 @@ public:
nano::gap_cache gap_cache;
nano::ledger ledger;
nano::signature_checker checker;
nano::outbound_bandwidth_limiter outbound_limiter;
nano::network network;
std::shared_ptr<nano::telemetry> telemetry;
nano::bootstrap_initiator bootstrap_initiator;
nano::bootstrap_server bootstrap_server;
nano::transport::tcp_listener tcp_listener;
boost::filesystem::path application_path;
nano::node_observers observers;

View file

@ -115,8 +115,13 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const
toml.put ("use_memory_pools", use_memory_pools, "If true, allocate memory from memory pools. Enabling this may improve performance. Memory is never released to the OS.\ntype:bool");
toml.put ("confirmation_history_size", confirmation_history_size, "Maximum confirmation history size. If tracking the rate of block confirmations, the websocket feature is recommended instead.\ntype:uint64");
toml.put ("active_elections_size", active_elections_size, "Number of active elections. Elections beyond this limit have limited survival time.\nWarning: modifying this value may result in a lower confirmation rate.\ntype:uint64,[250..]");
toml.put ("bandwidth_limit", bandwidth_limit, "Outbound traffic limit in bytes/sec after which messages will be dropped.\nNote: changing to unlimited bandwidth (0) is not recommended for limited connections.\ntype:uint64");
toml.put ("bandwidth_limit_burst_ratio", bandwidth_limit_burst_ratio, "Burst ratio for outbound traffic shaping.\ntype:double");
toml.put ("bootstrap_bandwidth_limit", bootstrap_bandwidth_limit, "Outbound bootstrap traffic limit in bytes/sec after which messages will be dropped.\nNote: changing to unlimited bandwidth (0) is not recommended for limited connections.\ntype:uint64");
toml.put ("bootstrap_bandwidth_burst_ratio", bootstrap_bandwidth_burst_ratio, "Burst ratio for outbound bootstrap traffic.\ntype:double");
toml.put ("conf_height_processor_batch_min_time", conf_height_processor_batch_min_time.count (), "Minimum write batching time when there are blocks pending confirmation height.\ntype:milliseconds");
toml.put ("backup_before_upgrade", backup_before_upgrade, "Backup the ledger database before performing upgrades.\nWarning: uses more disk storage and increases startup time when upgrading.\ntype:bool");
toml.put ("max_work_generate_multiplier", max_work_generate_multiplier, "Maximum allowed difficulty multiplier for work generation.\ntype:double,[1..]");
@ -360,8 +365,13 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml)
toml.get<bool> ("use_memory_pools", use_memory_pools);
toml.get<std::size_t> ("confirmation_history_size", confirmation_history_size);
toml.get<std::size_t> ("active_elections_size", active_elections_size);
toml.get<std::size_t> ("bandwidth_limit", bandwidth_limit);
toml.get<double> ("bandwidth_limit_burst_ratio", bandwidth_limit_burst_ratio);
toml.get<std::size_t> ("bootstrap_bandwidth_limit", bootstrap_bandwidth_limit);
toml.get<double> ("bootstrap_bandwidth_burst_ratio", bootstrap_bandwidth_burst_ratio);
toml.get<bool> ("backup_before_upgrade", backup_before_upgrade);
auto conf_height_processor_batch_min_time_l (conf_height_processor_batch_min_time.count ());

View file

@ -95,6 +95,10 @@ public:
std::size_t bandwidth_limit{ 10 * 1024 * 1024 };
/** By default, allow bursts of 15MB/s (not sustainable) */
double bandwidth_limit_burst_ratio{ 3. };
/** Default boostrap outbound traffic limit is 16MB/s ~ 128Mbit/s */
std::size_t bootstrap_bandwidth_limit{ 16 * 1024 * 1024 };
/** Bootstrap traffic does not need bursts */
double bootstrap_bandwidth_burst_ratio{ 1. };
std::chrono::milliseconds conf_height_processor_batch_min_time{ 50 };
bool backup_before_upgrade{ false };
double max_work_generate_multiplier{ 64. };

View file

@ -178,6 +178,14 @@ std::unique_ptr<nano::message> nano::transport::message_deserializer::deserializ
{
return deserialize_frontier_req (stream, header);
}
case nano::message_type::asc_pull_req:
{
return deserialize_asc_pull_req (stream, header);
}
case nano::message_type::asc_pull_ack:
{
return deserialize_asc_pull_ack (stream, header);
}
default:
{
status = parse_status::invalid_message_type;
@ -191,7 +199,7 @@ std::unique_ptr<nano::keepalive> nano::transport::message_deserializer::deserial
{
auto error = false;
auto incoming = std::make_unique<nano::keepalive> (error, stream, header);
if (!error && at_end (stream))
if (!error && nano::at_end (stream))
{
return incoming;
}
@ -206,7 +214,7 @@ std::unique_ptr<nano::publish> nano::transport::message_deserializer::deserializ
{
auto error = false;
auto incoming = std::make_unique<nano::publish> (error, stream, header, digest_a, &block_uniquer_m);
if (!error && at_end (stream))
if (!error && nano::at_end (stream))
{
release_assert (incoming->block);
if (!network_constants_m.work.validate_entry (*incoming->block))
@ -229,7 +237,7 @@ std::unique_ptr<nano::confirm_req> nano::transport::message_deserializer::deseri
{
auto error = false;
auto incoming = std::make_unique<nano::confirm_req> (error, stream, header, &block_uniquer_m);
if (!error && at_end (stream))
if (!error && nano::at_end (stream))
{
if (incoming->block == nullptr || !network_constants_m.work.validate_entry (*incoming->block))
{
@ -251,7 +259,7 @@ std::unique_ptr<nano::confirm_ack> nano::transport::message_deserializer::deseri
{
auto error = false;
auto incoming = std::make_unique<nano::confirm_ack> (error, stream, header, &vote_uniquer_m);
if (!error && at_end (stream))
if (!error && nano::at_end (stream))
{
return incoming;
}
@ -266,7 +274,7 @@ std::unique_ptr<nano::node_id_handshake> nano::transport::message_deserializer::
{
bool error = false;
auto incoming = std::make_unique<nano::node_id_handshake> (error, stream, header);
if (!error && at_end (stream))
if (!error && nano::at_end (stream))
{
return incoming;
}
@ -303,7 +311,7 @@ std::unique_ptr<nano::bulk_pull> nano::transport::message_deserializer::deserial
{
bool error = false;
auto incoming = std::make_unique<nano::bulk_pull> (error, stream, header);
if (!error && at_end (stream))
if (!error && nano::at_end (stream))
{
return incoming;
}
@ -318,7 +326,7 @@ std::unique_ptr<nano::bulk_pull_account> nano::transport::message_deserializer::
{
bool error = false;
auto incoming = std::make_unique<nano::bulk_pull_account> (error, stream, header);
if (!error && at_end (stream))
if (!error && nano::at_end (stream))
{
return incoming;
}
@ -333,7 +341,7 @@ std::unique_ptr<nano::frontier_req> nano::transport::message_deserializer::deser
{
bool error = false;
auto incoming = std::make_unique<nano::frontier_req> (error, stream, header);
if (!error && at_end (stream))
if (!error && nano::at_end (stream))
{
return incoming;
}
@ -350,15 +358,41 @@ std::unique_ptr<nano::bulk_push> nano::transport::message_deserializer::deserial
return std::make_unique<nano::bulk_push> (header);
}
bool nano::transport::message_deserializer::at_end (nano::stream & stream)
std::unique_ptr<nano::asc_pull_req> nano::transport::message_deserializer::deserialize_asc_pull_req (nano::stream & stream, const nano::message_header & header)
{
uint8_t junk;
auto end (nano::try_read (stream, junk));
return end;
bool error = false;
auto incoming = std::make_unique<nano::asc_pull_req> (error, stream, header);
// Intentionally not checking if at the end of stream, because these messages support backwards/forwards compatibility
if (!error)
{
return incoming;
}
else
{
status = parse_status::invalid_asc_pull_req_message;
}
return {};
}
nano::stat::detail nano::transport::message_deserializer::parse_status_to_stat_detail ()
std::unique_ptr<nano::asc_pull_ack> nano::transport::message_deserializer::deserialize_asc_pull_ack (nano::stream & stream, const nano::message_header & header)
{
bool error = false;
auto incoming = std::make_unique<nano::asc_pull_ack> (error, stream, header);
// Intentionally not checking if at the end of stream, because these messages support backwards/forwards compatibility
if (!error)
{
return incoming;
}
else
{
status = parse_status::invalid_asc_pull_ack_message;
}
return {};
}
nano::stat::detail nano::transport::message_deserializer::to_stat_detail (parse_status status)
{
// Keep additional `break` for readability
switch (status)
{
case parse_status::none:
@ -366,84 +400,133 @@ nano::stat::detail nano::transport::message_deserializer::parse_status_to_stat_d
break;
case parse_status::insufficient_work:
return stat::detail::insufficient_work;
break;
case parse_status::invalid_header:
return stat::detail::invalid_header;
break;
case parse_status::invalid_message_type:
return stat::detail::invalid_message_type;
break;
case parse_status::invalid_keepalive_message:
return stat::detail::invalid_keepalive_message;
break;
case parse_status::invalid_publish_message:
return stat::detail::invalid_publish_message;
break;
case parse_status::invalid_confirm_req_message:
return stat::detail::invalid_confirm_req_message;
break;
case parse_status::invalid_confirm_ack_message:
return stat::detail::invalid_confirm_ack_message;
break;
case parse_status::invalid_node_id_handshake_message:
return stat::detail::invalid_node_id_handshake_message;
break;
case parse_status::invalid_telemetry_req_message:
return stat::detail::invalid_telemetry_req_message;
break;
case parse_status::invalid_telemetry_ack_message:
return stat::detail::invalid_telemetry_ack_message;
break;
case parse_status::invalid_bulk_pull_message:
return stat::detail::invalid_bulk_pull_message;
break;
case parse_status::invalid_bulk_pull_account_message:
return stat::detail::invalid_bulk_pull_account_message;
break;
case parse_status::invalid_frontier_req_message:
return stat::detail::invalid_frontier_req_message;
break;
case parse_status::invalid_asc_pull_req_message:
return stat::detail::invalid_asc_pull_req_message;
break;
case parse_status::invalid_asc_pull_ack_message:
return stat::detail::invalid_asc_pull_ack_message;
break;
case parse_status::invalid_network:
return stat::detail::invalid_network;
break;
case parse_status::outdated_version:
return stat::detail::outdated_version;
break;
case parse_status::duplicate_publish_message:
return stat::detail::duplicate_publish;
break;
case parse_status::message_size_too_big:
return stat::detail::message_too_big;
break;
}
return {};
}
std::string nano::transport::message_deserializer::parse_status_to_string ()
std::string nano::transport::message_deserializer::to_string (parse_status status)
{
// Keep additional `break` for readability
switch (status)
{
case parse_status::none:
return "none";
break;
case parse_status::success:
return "success";
break;
case parse_status::insufficient_work:
return "insufficient_work";
break;
case parse_status::invalid_header:
return "invalid_header";
break;
case parse_status::invalid_message_type:
return "invalid_message_type";
break;
case parse_status::invalid_keepalive_message:
return "invalid_keepalive_message";
break;
case parse_status::invalid_publish_message:
return "invalid_publish_message";
break;
case parse_status::invalid_confirm_req_message:
return "invalid_confirm_req_message";
break;
case parse_status::invalid_confirm_ack_message:
return "invalid_confirm_ack_message";
break;
case parse_status::invalid_node_id_handshake_message:
return "invalid_node_id_handshake_message";
break;
case parse_status::invalid_telemetry_req_message:
return "invalid_telemetry_req_message";
break;
case parse_status::invalid_telemetry_ack_message:
return "invalid_telemetry_ack_message";
break;
case parse_status::invalid_bulk_pull_message:
return "invalid_bulk_pull_message";
break;
case parse_status::invalid_bulk_pull_account_message:
return "invalid_bulk_pull_account_message";
break;
case parse_status::invalid_frontier_req_message:
return "invalid_frontier_req_message";
break;
case parse_status::invalid_asc_pull_req_message:
return "invalid_asc_pull_req_message";
break;
case parse_status::invalid_asc_pull_ack_message:
return "invalid_asc_pull_ack_message";
break;
case parse_status::invalid_network:
return "invalid_network";
break;
case parse_status::outdated_version:
return "outdated_version";
break;
case parse_status::duplicate_publish_message:
return "duplicate_publish_message";
break;
case parse_status::message_size_too_big:
return "message_size_too_big";
break;
}
return "n/a";
}

View file

@ -35,6 +35,8 @@ namespace transport
invalid_bulk_pull_message,
invalid_bulk_pull_account_message,
invalid_frontier_req_message,
invalid_asc_pull_req_message,
invalid_asc_pull_ack_message,
invalid_network,
outdated_version,
duplicate_publish_message,
@ -76,24 +78,25 @@ namespace transport
std::unique_ptr<nano::bulk_pull_account> deserialize_bulk_pull_account (nano::stream &, nano::message_header const &);
std::unique_ptr<nano::bulk_push> deserialize_bulk_push (nano::stream &, nano::message_header const &);
std::unique_ptr<nano::frontier_req> deserialize_frontier_req (nano::stream &, nano::message_header const &);
static bool at_end (nano::stream &);
std::unique_ptr<nano::asc_pull_req> deserialize_asc_pull_req (nano::stream &, nano::message_header const &);
std::unique_ptr<nano::asc_pull_ack> deserialize_asc_pull_ack (nano::stream &, nano::message_header const &);
std::shared_ptr<std::vector<uint8_t>> read_buffer;
public:
std::string parse_status_to_string ();
stat::detail parse_status_to_stat_detail ();
private: // Constants
static constexpr std::size_t HEADER_SIZE = 8;
static constexpr std::size_t MAX_MESSAGE_SIZE = 1024 * 4;
static constexpr std::size_t MAX_MESSAGE_SIZE = 1024 * 65;
private: // Dependencies
nano::network_constants const & network_constants_m;
nano::network_filter & publish_filter_m;
nano::block_uniquer & block_uniquer_m;
nano::vote_uniquer & vote_uniquer_m;
public:
static stat::detail to_stat_detail (parse_status);
static std::string to_string (parse_status);
};
}
}

View file

@ -204,11 +204,13 @@ void nano::transport::tcp_server::receive_message ()
if (ec)
{
// IO error or critical error when deserializing message
this_l->node->stats.inc (nano::stat::type::error, this_l->message_deserializer->parse_status_to_stat_detail ());
this_l->node->stats.inc (nano::stat::type::error, nano::transport::message_deserializer::to_stat_detail (this_l->message_deserializer->status));
this_l->stop ();
return;
}
this_l->received_message (std::move (message));
else
{
this_l->received_message (std::move (message));
}
});
}
@ -223,7 +225,7 @@ void nano::transport::tcp_server::received_message (std::unique_ptr<nano::messag
{
// Error while deserializing message
debug_assert (message_deserializer->status != transport::message_deserializer::parse_status::success);
node->stats.inc (nano::stat::type::error, message_deserializer->parse_status_to_stat_detail ());
node->stats.inc (nano::stat::type::error, nano::transport::message_deserializer::to_stat_detail (message_deserializer->status));
if (message_deserializer->status == transport::message_deserializer::parse_status::duplicate_publish_message)
{
node->stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_publish);
@ -264,18 +266,8 @@ bool nano::transport::tcp_server::process_message (std::unique_ptr<nano::message
}
else if (handshake_visitor.bootstrap)
{
if (allow_bootstrap)
if (!to_bootstrap_connection ())
{
// Switch to bootstrap connection mode and handle message in subsequent bootstrap visitor
if (!to_bootstrap_connection ())
{
stop ();
return false;
}
}
else
{
// Received bootstrap request in a connection that only allows for realtime traffic, abort
stop ();
return false;
}
@ -471,6 +463,16 @@ void nano::transport::tcp_server::realtime_message_visitor::telemetry_ack (const
process = true;
}
void nano::transport::tcp_server::realtime_message_visitor::asc_pull_req (const nano::asc_pull_req & message)
{
process = true;
}
void nano::transport::tcp_server::realtime_message_visitor::asc_pull_ack (const nano::asc_pull_ack & message)
{
process = true;
}
/*
* Bootstrap
*/
@ -571,13 +573,26 @@ void nano::transport::tcp_server::timeout ()
bool nano::transport::tcp_server::to_bootstrap_connection ()
{
if (socket->type () == nano::socket::type_t::undefined && !node->flags.disable_bootstrap_listener && node->tcp_listener.bootstrap_count < node->config.bootstrap_connections_max)
if (!allow_bootstrap)
{
++node->tcp_listener.bootstrap_count;
socket->type_set (nano::socket::type_t::bootstrap);
return true;
return false;
}
return false;
if (node->flags.disable_bootstrap_listener)
{
return false;
}
if (node->tcp_listener.bootstrap_count >= node->config.bootstrap_connections_max)
{
return false;
}
if (socket->type () != nano::socket::type_t::undefined)
{
return false;
}
++node->tcp_listener.bootstrap_count;
socket->type_set (nano::socket::type_t::bootstrap);
return true;
}
bool nano::transport::tcp_server::to_realtime_connection (nano::account const & node_id)

View file

@ -115,6 +115,8 @@ private:
void frontier_req (nano::frontier_req const &) override;
void telemetry_req (nano::telemetry_req const &) override;
void telemetry_ack (nano::telemetry_ack const &) override;
void asc_pull_req (nano::asc_pull_req const &) override;
void asc_pull_ack (nano::asc_pull_ack const &) override;
private:
tcp_server & server;

View file

@ -51,13 +51,13 @@ nano::transport::channel::channel (nano::node & node_a) :
set_network_version (node_a.network_params.network.protocol_version);
}
void nano::transport::channel::send (nano::message & message_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::buffer_drop_policy drop_policy_a)
void nano::transport::channel::send (nano::message & message_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::buffer_drop_policy drop_policy_a, nano::bandwidth_limit_type limiter_type)
{
auto buffer (message_a.to_shared_const_buffer ());
auto detail = nano::to_stat_detail (message_a.header.type);
auto is_droppable_by_limiter = drop_policy_a == nano::buffer_drop_policy::limiter;
auto should_drop (node.network.limiter.should_drop (buffer.size ()));
if (!is_droppable_by_limiter || !should_drop)
auto should_pass (node.outbound_limiter.should_pass (buffer.size (), limiter_type));
if (!is_droppable_by_limiter || should_pass)
{
send_buffer (buffer, callback_a, drop_policy_a);
node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out);
@ -209,20 +209,3 @@ bool nano::transport::reserved_address (nano::endpoint const & endpoint_a, bool
}
return result;
}
using namespace std::chrono_literals;
nano::bandwidth_limiter::bandwidth_limiter (double const limit_burst_ratio_a, std::size_t const limit_a) :
bucket (static_cast<std::size_t> (limit_a * limit_burst_ratio_a), limit_a)
{
}
bool nano::bandwidth_limiter::should_drop (std::size_t const & message_size_a)
{
return !bucket.try_consume (nano::narrow_cast<unsigned int> (message_size_a));
}
void nano::bandwidth_limiter::reset (double const limit_burst_ratio_a, std::size_t const limit_a)
{
bucket.reset (static_cast<std::size_t> (limit_a * limit_burst_ratio_a), limit_a);
}

View file

@ -3,6 +3,7 @@
#include <nano/lib/locks.hpp>
#include <nano/lib/rate_limiting.hpp>
#include <nano/lib/stats.hpp>
#include <nano/node/bandwidth_limiter.hpp>
#include <nano/node/common.hpp>
#include <nano/node/messages.hpp>
#include <nano/node/socket.hpp>
@ -11,18 +12,6 @@
namespace nano
{
class bandwidth_limiter final
{
public:
// initialize with limit 0 = unbounded
bandwidth_limiter (double, std::size_t);
bool should_drop (std::size_t const &);
void reset (double, std::size_t);
private:
nano::rate::token_bucket bucket;
};
namespace transport
{
nano::endpoint map_endpoint_to_v6 (nano::endpoint const &);
@ -52,7 +41,7 @@ namespace transport
virtual ~channel () = default;
virtual std::size_t hash_code () const = 0;
virtual bool operator== (nano::transport::channel const &) const = 0;
void send (nano::message & message_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a = nullptr, nano::buffer_drop_policy policy_a = nano::buffer_drop_policy::limiter);
void send (nano::message & message_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a = nullptr, nano::buffer_drop_policy policy_a = nano::buffer_drop_policy::limiter, nano::bandwidth_limit_type = nano::bandwidth_limit_type::standard);
// TODO: investigate clang-tidy warning about default parameters on virtual/override functions
//
virtual void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter) = 0;

View file

@ -222,13 +222,6 @@ nano::keypair::keypair (std::string const & prv_a)
ed25519_publickey (prv.bytes.data (), pub.bytes.data ());
}
// Serialize a block prefixed with an 8-bit typecode
void nano::serialize_block (nano::stream & stream_a, nano::block const & block_a)
{
write (stream_a, block_a.type ());
block_a.serialize (stream_a);
}
nano::account_info::account_info (nano::block_hash const & head_a, nano::account const & representative_a, nano::block_hash const & open_block_a, nano::amount const & balance_a, uint64_t modified_a, uint64_t block_count_a, nano::epoch epoch_a) :
head (head_a),
representative (representative_a),

View file

@ -595,7 +595,7 @@ void ledger_processor::receive_block (nano::receive_block & block_a)
if (result.code == nano::process_result::progress)
{
auto account (ledger.store.frontier.get (transaction, block_a.hashables.previous));
result.code = account.is_zero () ? nano::process_result::gap_previous : nano::process_result::progress; //Have we seen the previous block? No entries for account at all (Harmless)
result.code = account.is_zero () ? nano::process_result::gap_previous : nano::process_result::progress; // Have we seen the previous block? No entries for account at all (Harmless)
if (result.code == nano::process_result::progress)
{
// Validate block if not verified outside of ledger
@ -1081,7 +1081,6 @@ bool nano::ledger::rollback (nano::write_transaction const & transaction_a, nano
return rollback (transaction_a, block_a, rollback_list);
}
// Return account containing hash
nano::account nano::ledger::account (nano::transaction const & transaction_a, nano::block_hash const & hash_a) const
{
return store.block.account (transaction_a, hash_a);
@ -1108,6 +1107,19 @@ nano::account nano::ledger::account_safe (nano::transaction const & transaction_
}
}
nano::account nano::ledger::account_safe (const nano::transaction & transaction, const nano::block_hash & hash) const
{
auto block = store.block.get (transaction, hash);
if (block)
{
return store.block.account_calculated (*block);
}
else
{
return { 0 };
}
}
// Return amount decrease or increase for block
nano::uint128_t nano::ledger::amount (nano::transaction const & transaction_a, nano::account const & account_a)
{

View file

@ -28,8 +28,19 @@ class ledger final
{
public:
ledger (nano::store &, nano::stat &, nano::ledger_constants & constants, nano::generate_cache const & = nano::generate_cache ());
/**
* Return account containing hash, expects that block hash exists in ledger
*/
nano::account account (nano::transaction const &, nano::block_hash const &) const;
/**
* For non-prunning nodes same as `ledger::account()`
* For prunning nodes ensures that block hash exists, otherwise returns zero account
*/
nano::account account_safe (nano::transaction const &, nano::block_hash const &, bool &) const;
/**
* Return account containing hash, returns zero account if account can not be found
*/
nano::account account_safe (nano::transaction const &, nano::block_hash const &) const;
nano::uint128_t amount (nano::transaction const &, nano::account const &);
nano::uint128_t amount (nano::transaction const &, nano::block_hash const &);
/** Safe for previous block, but block hash_a must exist */

View file

@ -167,4 +167,32 @@ auto nano::unchecked_store::equal_range (nano::transaction const & transaction,
auto nano::unchecked_store::full_range (nano::transaction const & transaction) -> std::pair<iterator, iterator>
{
return std::make_pair (begin (transaction), end ());
}
std::optional<nano::account_info> nano::account_store::get (const nano::transaction & transaction, const nano::account & account)
{
nano::account_info info;
bool error = get (transaction, account, info);
if (!error)
{
return info;
}
else
{
return std::nullopt;
}
}
std::optional<nano::confirmation_height_info> nano::confirmation_height_store::get (const nano::transaction & transaction, const nano::account & account)
{
nano::confirmation_height_info info;
bool error = get (transaction, account, info);
if (!error)
{
return info;
}
else
{
return std::nullopt;
}
}

View file

@ -643,6 +643,7 @@ class account_store
public:
virtual void put (nano::write_transaction const &, nano::account const &, nano::account_info const &) = 0;
virtual bool get (nano::transaction const &, nano::account const &, nano::account_info &) = 0;
std::optional<nano::account_info> get (nano::transaction const &, nano::account const &);
virtual void del (nano::write_transaction const &, nano::account const &) = 0;
virtual bool exists (nano::transaction const &, nano::account const &) = 0;
virtual size_t count (nano::transaction const &) = 0;
@ -732,7 +733,7 @@ public:
* Ruturns true on error, false on success.
*/
virtual bool get (nano::transaction const & transaction_a, nano::account const & account_a, nano::confirmation_height_info & confirmation_height_info_a) = 0;
std::optional<nano::confirmation_height_info> get (nano::transaction const & transaction_a, nano::account const & account_a);
virtual bool exists (nano::transaction const & transaction_a, nano::account const & account_a) const = 0;
virtual void del (nano::write_transaction const & transaction_a, nano::account const & account_a) = 0;
virtual uint64_t count (nano::transaction const & transaction_a) = 0;

View file

@ -38,6 +38,23 @@ void nano::test::wait_peer_connections (nano::test::system & system_a)
wait_peer_count (false);
}
nano::hash_or_account nano::test::random_hash_or_account ()
{
nano::hash_or_account random_hash;
nano::random_pool::generate_block (random_hash.bytes.data (), random_hash.bytes.size ());
return random_hash;
}
nano::block_hash nano::test::random_hash ()
{
return nano::test::random_hash_or_account ().as_block_hash ();
}
nano::account nano::test::random_account ()
{
return nano::test::random_hash_or_account ().as_account ();
}
bool nano::test::process (nano::node & node, std::vector<std::shared_ptr<nano::block>> blocks)
{
auto const transaction = node.store.tx_begin_write ({ tables::accounts, tables::blocks, tables::frontiers, tables::pending });

View file

@ -272,6 +272,19 @@ namespace test
void wait_peer_connections (nano::test::system &);
/**
* Generate a random block hash
*/
nano::hash_or_account random_hash_or_account ();
/**
* Generate a random block hash
*/
nano::block_hash random_hash ();
/**
* Generate a random block hash
*/
nano::account random_account ();
/**
Convenience function to call `node::process` function for multiple blocks at once.
@return true if all blocks were successfully processed and inserted into ledger