diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index 242e0c0b..441b3bd4 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -8,6 +8,7 @@ add_executable( block_store.cpp blockprocessor.cpp bootstrap.cpp + bootstrap_server.cpp cli.cpp confirmation_height.cpp confirmation_solicitor.cpp diff --git a/nano/core_test/bootstrap_server.cpp b/nano/core_test/bootstrap_server.cpp new file mode 100644 index 00000000..7685066c --- /dev/null +++ b/nano/core_test/bootstrap_server.cpp @@ -0,0 +1,519 @@ +#include +#include + +#include + +#include + +using namespace std::chrono_literals; + +namespace +{ +using block_list_t = std::vector>; + +/** + * Creates `block_count` random send blocks in an account chain + */ +block_list_t setup_chain (nano::test::system & system, nano::node & node, nano::keypair key, int block_count) +{ + auto latest = node.latest (key.pub); + auto balance = node.balance (key.pub); + + std::vector> blocks; + for (int n = 0; n < block_count; ++n) + { + nano::keypair throwaway; + nano::block_builder builder; + + balance -= 1; + auto send = builder + .send () + .previous (latest) + .destination (throwaway.pub) + .balance (balance) + .sign (key.prv, key.pub) + .work (*system.work.generate (latest)) + .build_shared (); + + latest = send->hash (); + blocks.push_back (send); + } + + EXPECT_TRUE (nano::test::process (node, blocks)); + // Confirm whole chain at once + EXPECT_TRUE (nano::test::confirm (node, { blocks.back () })); + EXPECT_TIMELY (5s, nano::test::confirmed (node, blocks)); + + return blocks; +} + +/** + * Creates `count` account chains, each with `block_count` blocks + */ +std::vector> setup_chains (nano::test::system & system, nano::node & node, int count, int block_count) +{ + auto latest = node.latest (nano::dev::genesis_key.pub); + auto balance = node.balance (nano::dev::genesis_key.pub); + + std::vector> chains; + for (int n = 0; n < count; ++n) + { + nano::keypair key; + nano::block_builder builder; + + balance -= block_count * 2; // Send enough to later create `block_count` blocks + auto send = builder + .send () + .previous (latest) + .destination (key.pub) + .balance (balance) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (latest)) + .build_shared (); + + auto open = builder + .open () + .source (send->hash ()) + .representative (key.pub) + .account (key.pub) + .sign (key.prv, key.pub) + .work (*system.work.generate (key.pub)) + .build_shared (); + + latest = send->hash (); + + // Ensure blocks are in the ledger and confirmed + EXPECT_TRUE (nano::test::process (node, { send, open })); + EXPECT_TRUE (nano::test::confirm (node, { send, open })); + EXPECT_TIMELY (5s, nano::test::confirmed (node, { send, open })); + + auto added_blocks = setup_chain (system, node, key, block_count); + + auto blocks = block_list_t{ open }; + blocks.insert (blocks.end (), added_blocks.begin (), added_blocks.end ()); + + chains.emplace_back (key.pub, blocks); + } + + return chains; +} + +/** + * Helper to track responses in thread safe way + */ +class responses_helper final +{ +public: + void add (nano::asc_pull_ack & ack) + { + nano::lock_guard lock{ mutex }; + responses.push_back (ack); + } + + std::vector get () + { + nano::lock_guard lock{ mutex }; + return responses; + } + + std::size_t size () + { + nano::lock_guard lock{ mutex }; + return responses.size (); + } + +private: + nano::mutex mutex; + std::vector responses; +}; + +/** + * Checks if both lists contain the same blocks, with `blocks_b` skipped by `skip` elements + */ +bool compare_blocks (std::vector> blocks_a, std::vector> blocks_b, int skip = 0) +{ + debug_assert (blocks_b.size () >= blocks_a.size () + skip); + + const auto count = blocks_a.size (); + for (int n = 0; n < count; ++n) + { + auto & block_a = *blocks_a[n]; + auto & block_b = *blocks_b[n + skip]; + + // nano::block does not have != operator + if (!(block_a == block_b)) + { + return false; + } + } + return true; +} +} + +TEST (bootstrap_server, serve_account_blocks) +{ + nano::test::system system{}; + auto & node = *system.add_node (); + + responses_helper responses; + node.bootstrap_server.on_response.add ([&] (auto & response, auto & channel) { + responses.add (response); + }); + + auto chains = setup_chains (system, node, 1, 128); + auto [first_account, first_blocks] = chains.front (); + + // Request blocks from account root + nano::asc_pull_req request{ node.network_params.network }; + request.id = 7; + request.type = nano::asc_pull_type::blocks; + + nano::asc_pull_req::blocks_payload request_payload; + request_payload.start = first_account; + request_payload.count = nano::bootstrap_server::max_blocks; + + request.payload = request_payload; + request.update_header (); + + node.network.inbound (request, nano::test::fake_channel (node)); + + ASSERT_TIMELY (5s, responses.size () == 1); + + auto response = responses.get ().front (); + // Ensure we got response exactly for what we asked for + ASSERT_EQ (response.id, 7); + ASSERT_EQ (response.type, nano::asc_pull_type::blocks); + + nano::asc_pull_ack::blocks_payload response_payload; + ASSERT_NO_THROW (response_payload = std::get (response.payload)); + ASSERT_EQ (response_payload.blocks.size (), 128); + ASSERT_TRUE (compare_blocks (response_payload.blocks, first_blocks)); + + // Ensure we don't get any unexpected responses + ASSERT_ALWAYS (1s, responses.size () == 1); +} + +TEST (bootstrap_server, serve_hash) +{ + nano::test::system system{}; + auto & node = *system.add_node (); + + responses_helper responses; + node.bootstrap_server.on_response.add ([&] (auto & response, auto & channel) { + responses.add (response); + }); + + auto chains = setup_chains (system, node, 1, 256); + auto [account, blocks] = chains.front (); + + // Skip a few blocks to request hash in the middle of the chain + blocks = block_list_t (std::next (blocks.begin (), 9), blocks.end ()); + + // Request blocks from the middle of the chain + nano::asc_pull_req request{ node.network_params.network }; + request.id = 7; + request.type = nano::asc_pull_type::blocks; + + nano::asc_pull_req::blocks_payload request_payload; + request_payload.start = blocks.front ()->hash (); + request_payload.count = nano::bootstrap_server::max_blocks; + + request.payload = request_payload; + request.update_header (); + + node.network.inbound (request, nano::test::fake_channel (node)); + + ASSERT_TIMELY (5s, responses.size () == 1); + + auto response = responses.get ().front (); + // Ensure we got response exactly for what we asked for + ASSERT_EQ (response.id, 7); + ASSERT_EQ (response.type, nano::asc_pull_type::blocks); + + nano::asc_pull_ack::blocks_payload response_payload; + ASSERT_NO_THROW (response_payload = std::get (response.payload)); + ASSERT_EQ (response_payload.blocks.size (), 128); + ASSERT_TRUE (compare_blocks (response_payload.blocks, blocks)); + + // Ensure we don't get any unexpected responses + ASSERT_ALWAYS (1s, responses.size () == 1); +} + +TEST (bootstrap_server, serve_hash_one) +{ + nano::test::system system{}; + auto & node = *system.add_node (); + + responses_helper responses; + node.bootstrap_server.on_response.add ([&] (auto & response, auto & channel) { + responses.add (response); + }); + + auto chains = setup_chains (system, node, 1, 256); + auto [account, blocks] = chains.front (); + + // Skip a few blocks to request hash in the middle of the chain + blocks = block_list_t (std::next (blocks.begin (), 9), blocks.end ()); + + // Request blocks from the middle of the chain + nano::asc_pull_req request{ node.network_params.network }; + request.id = 7; + request.type = nano::asc_pull_type::blocks; + + nano::asc_pull_req::blocks_payload request_payload; + request_payload.start = blocks.front ()->hash (); + request_payload.count = 1; + + request.payload = request_payload; + request.update_header (); + + node.network.inbound (request, nano::test::fake_channel (node)); + + ASSERT_TIMELY (5s, responses.size () == 1); + + auto response = responses.get ().front (); + // Ensure we got response exactly for what we asked for + ASSERT_EQ (response.id, 7); + ASSERT_EQ (response.type, nano::asc_pull_type::blocks); + + nano::asc_pull_ack::blocks_payload response_payload; + ASSERT_NO_THROW (response_payload = std::get (response.payload)); + ASSERT_EQ (response_payload.blocks.size (), 1); + ASSERT_TRUE (response_payload.blocks.front ()->hash () == request_payload.start); +} + +TEST (bootstrap_server, serve_end_of_chain) +{ + nano::test::system system{}; + auto & node = *system.add_node (); + + responses_helper responses; + node.bootstrap_server.on_response.add ([&] (auto & response, auto & channel) { + responses.add (response); + }); + + auto chains = setup_chains (system, node, 1, 128); + auto [account, blocks] = chains.front (); + + // Request blocks from account frontier + nano::asc_pull_req request{ node.network_params.network }; + request.id = 7; + request.type = nano::asc_pull_type::blocks; + + nano::asc_pull_req::blocks_payload request_payload; + request_payload.start = blocks.back ()->hash (); + request_payload.count = nano::bootstrap_server::max_blocks; + + request.payload = request_payload; + request.update_header (); + + node.network.inbound (request, nano::test::fake_channel (node)); + + ASSERT_TIMELY (5s, responses.size () == 1); + + auto response = responses.get ().front (); + // Ensure we got response exactly for what we asked for + ASSERT_EQ (response.id, 7); + ASSERT_EQ (response.type, nano::asc_pull_type::blocks); + + nano::asc_pull_ack::blocks_payload response_payload; + ASSERT_NO_THROW (response_payload = std::get (response.payload)); + // Response should contain only the last block from chain + ASSERT_EQ (response_payload.blocks.size (), 1); + ASSERT_EQ (*response_payload.blocks.front (), *blocks.back ()); +} + +TEST (bootstrap_server, serve_missing) +{ + nano::test::system system{}; + auto & node = *system.add_node (); + + responses_helper responses; + node.bootstrap_server.on_response.add ([&] (auto & response, auto & channel) { + responses.add (response); + }); + + auto chains = setup_chains (system, node, 1, 128); + + // Request blocks from account frontier + nano::asc_pull_req request{ node.network_params.network }; + request.id = 7; + request.type = nano::asc_pull_type::blocks; + + nano::asc_pull_req::blocks_payload request_payload; + request_payload.start = nano::test::random_hash (); + request_payload.count = nano::bootstrap_server::max_blocks; + + request.payload = request_payload; + request.update_header (); + + node.network.inbound (request, nano::test::fake_channel (node)); + + ASSERT_TIMELY (5s, responses.size () == 1); + + auto response = responses.get ().front (); + // Ensure we got response exactly for what we asked for + ASSERT_EQ (response.id, 7); + ASSERT_EQ (response.type, nano::asc_pull_type::blocks); + + nano::asc_pull_ack::blocks_payload response_payload; + ASSERT_NO_THROW (response_payload = std::get (response.payload)); + // There should be nothing sent + ASSERT_EQ (response_payload.blocks.size (), 0); +} + +TEST (bootstrap_server, serve_multiple) +{ + nano::test::system system{}; + auto & node = *system.add_node (); + + responses_helper responses; + node.bootstrap_server.on_response.add ([&] (auto & response, auto & channel) { + responses.add (response); + }); + + auto chains = setup_chains (system, node, 32, 16); + + { + // Request blocks from multiple chains at once + int next_id = 0; + for (auto & [account, blocks] : chains) + { + // Request blocks from account root + nano::asc_pull_req request{ node.network_params.network }; + request.id = next_id++; + request.type = nano::asc_pull_type::blocks; + + nano::asc_pull_req::blocks_payload request_payload; + request_payload.start = account; + request_payload.count = nano::bootstrap_server::max_blocks; + + request.payload = request_payload; + request.update_header (); + + node.network.inbound (request, nano::test::fake_channel (node)); + } + } + + ASSERT_TIMELY (15s, responses.size () == chains.size ()); + + auto all_responses = responses.get (); + { + int next_id = 0; + for (auto & [account, blocks] : chains) + { + // Find matching response + auto response_it = std::find_if (all_responses.begin (), all_responses.end (), [&] (auto ack) { return ack.id == next_id; }); + ASSERT_TRUE (response_it != all_responses.end ()); + auto response = *response_it; + + // Ensure we got response exactly for what we asked for + ASSERT_EQ (response.id, next_id); + ASSERT_EQ (response.type, nano::asc_pull_type::blocks); + + nano::asc_pull_ack::blocks_payload response_payload; + ASSERT_NO_THROW (response_payload = std::get (response.payload)); + ASSERT_EQ (response_payload.blocks.size (), 17); // 1 open block + 16 random blocks + ASSERT_TRUE (compare_blocks (response_payload.blocks, blocks)); + + ++next_id; + } + } + + // Ensure we don't get any unexpected responses + ASSERT_ALWAYS (1s, responses.size () == chains.size ()); +} + +TEST (bootstrap_server, serve_account_info) +{ + nano::test::system system{}; + auto & node = *system.add_node (); + + responses_helper responses; + node.bootstrap_server.on_response.add ([&] (auto & response, auto & channel) { + responses.add (response); + }); + + auto chains = setup_chains (system, node, 1, 128); + auto [account, blocks] = chains.front (); + + // Request blocks from account root + nano::asc_pull_req request{ node.network_params.network }; + request.id = 7; + request.type = nano::asc_pull_type::account_info; + + nano::asc_pull_req::account_info_payload request_payload; + request_payload.target = account; + + request.payload = request_payload; + request.update_header (); + + node.network.inbound (request, nano::test::fake_channel (node)); + + ASSERT_TIMELY (5s, responses.size () == 1); + + auto response = responses.get ().front (); + // Ensure we got response exactly for what we asked for + ASSERT_EQ (response.id, 7); + ASSERT_EQ (response.type, nano::asc_pull_type::account_info); + + nano::asc_pull_ack::account_info_payload response_payload; + ASSERT_NO_THROW (response_payload = std::get (response.payload)); + + ASSERT_EQ (response_payload.account, account); + ASSERT_EQ (response_payload.account_open, blocks.front ()->hash ()); + ASSERT_EQ (response_payload.account_head, blocks.back ()->hash ()); + ASSERT_EQ (response_payload.account_block_count, blocks.size ()); + ASSERT_EQ (response_payload.account_conf_frontier, blocks.back ()->hash ()); + ASSERT_EQ (response_payload.account_conf_height, blocks.size ()); + + // Ensure we don't get any unexpected responses + ASSERT_ALWAYS (1s, responses.size () == 1); +} + +TEST (bootstrap_server, serve_account_info_missing) +{ + nano::test::system system{}; + auto & node = *system.add_node (); + + responses_helper responses; + node.bootstrap_server.on_response.add ([&] (auto & response, auto & channel) { + responses.add (response); + }); + + auto chains = setup_chains (system, node, 1, 128); + auto [account, blocks] = chains.front (); + + // Request blocks from account root + nano::asc_pull_req request{ node.network_params.network }; + request.id = 7; + request.type = nano::asc_pull_type::account_info; + + nano::asc_pull_req::account_info_payload request_payload; + request_payload.target = nano::test::random_account (); + + request.payload = request_payload; + request.update_header (); + + node.network.inbound (request, nano::test::fake_channel (node)); + + ASSERT_TIMELY (5s, responses.size () == 1); + + auto response = responses.get ().front (); + // Ensure we got response exactly for what we asked for + ASSERT_EQ (response.id, 7); + ASSERT_EQ (response.type, nano::asc_pull_type::account_info); + + nano::asc_pull_ack::account_info_payload response_payload; + ASSERT_NO_THROW (response_payload = std::get (response.payload)); + + ASSERT_EQ (response_payload.account, request_payload.target); + ASSERT_EQ (response_payload.account_open, 0); + ASSERT_EQ (response_payload.account_head, 0); + ASSERT_EQ (response_payload.account_block_count, 0); + ASSERT_EQ (response_payload.account_conf_frontier, 0); + ASSERT_EQ (response_payload.account_conf_height, 0); + + // Ensure we don't get any unexpected responses + ASSERT_ALWAYS (1s, responses.size () == 1); +} \ No newline at end of file diff --git a/nano/core_test/message.cpp b/nano/core_test/message.cpp index 45fc7608..946be3bb 100644 --- a/nano/core_test/message.cpp +++ b/nano/core_test/message.cpp @@ -1,12 +1,30 @@ #include #include #include +#include #include #include #include +namespace +{ +std::shared_ptr random_block () +{ + nano::block_builder builder; + auto block = builder + .send () + .previous (nano::test::random_hash ()) + .destination (nano::keypair ().pub) + .balance (2) + .sign (nano::keypair ().prv, 4) + .work (5) + .build_shared (); + return block; +} +} + TEST (message, keepalive_serialization) { nano::keepalive request1{ nano::dev::network_params.network }; @@ -45,15 +63,7 @@ TEST (message, keepalive_deserialize) TEST (message, publish_serialization) { - nano::block_builder builder; - auto block = builder - .send () - .previous (0) - .destination (1) - .balance (2) - .sign (nano::keypair ().prv, 4) - .work (5) - .build_shared (); + auto block = random_block (); nano::publish publish{ nano::dev::network_params.network, block }; ASSERT_EQ (nano::block_type::send, publish.header.block_type ()); std::vector bytes; @@ -277,30 +287,181 @@ TEST (message, bulk_pull_serialization) ASSERT_TRUE (header.bulk_pull_ascending ()); } -TEST (message, keepalive_to_string) +TEST (message, asc_pull_req_serialization_blocks) { - nano::message_header hdr{ nano::dev::network_params.network, nano::message_type::keepalive }; - std::string expected = hdr.to_string (); + nano::asc_pull_req original{ nano::dev::network_params.network }; + original.id = 7; + original.type = nano::asc_pull_type::blocks; - nano::keepalive keepalive = nano::keepalive (nano::dev::network_params.network); - ASSERT_EQ (keepalive.to_string (), expected + "\n:::0\n:::0\n:::0\n:::0\n:::0\n:::0\n:::0\n:::0"); + nano::asc_pull_req::blocks_payload original_payload; + original_payload.start = nano::test::random_hash (); + original_payload.count = 111; - expected.append ("\n:::0"); + original.payload = original_payload; + original.update_header (); - keepalive.peers[1] = nano::endpoint{ boost::asio::ip::make_address_v6 ("::1"), 45 }; - expected.append ("\n::1:45"); - - keepalive.peers[2] = nano::endpoint{ boost::asio::ip::make_address_v6 ("2001:db8:85a3:8d3:1319:8a2e:370:7348"), 0 }; - expected.append ("\n2001:db8:85a3:8d3:1319:8a2e:370:7348:0"); - - keepalive.peers[3] = nano::endpoint{ boost::asio::ip::make_address_v6 ("::"), 65535 }; - expected.append ("\n:::65535"); - - for (int i = 4; i < keepalive.peers.size (); i++) + // Serialize + std::vector bytes; { - keepalive.peers[i] = nano::endpoint{ boost::asio::ip::make_address_v6 ("::ffff:1.2.3.4"), 1234 }; - expected.append ("\n::ffff:1.2.3.4:1234"); + nano::vectorstream stream{ bytes }; + original.serialize (stream); + } + nano::bufferstream stream{ bytes.data (), bytes.size () }; + + // Header + bool error = false; + nano::message_header header (error, stream); + ASSERT_FALSE (error); + ASSERT_EQ (nano::message_type::asc_pull_req, header.type); + + // Message + nano::asc_pull_req message (error, stream, header); + ASSERT_FALSE (error); + ASSERT_EQ (original.id, message.id); + ASSERT_EQ (original.type, message.type); + + nano::asc_pull_req::blocks_payload message_payload; + ASSERT_NO_THROW (message_payload = std::get (message.payload)); + ASSERT_EQ (original_payload.start, message_payload.start); + ASSERT_EQ (original_payload.count, message_payload.count); + + ASSERT_TRUE (nano::at_end (stream)); +} + +TEST (message, asc_pull_req_serialization_account_info) +{ + nano::asc_pull_req original{ nano::dev::network_params.network }; + original.id = 7; + original.type = nano::asc_pull_type::account_info; + + nano::asc_pull_req::account_info_payload original_payload; + original_payload.target = nano::test::random_hash (); + + original.payload = original_payload; + original.update_header (); + + // Serialize + std::vector bytes; + { + nano::vectorstream stream{ bytes }; + original.serialize (stream); + } + nano::bufferstream stream{ bytes.data (), bytes.size () }; + + // Header + bool error = false; + nano::message_header header (error, stream); + ASSERT_FALSE (error); + ASSERT_EQ (nano::message_type::asc_pull_req, header.type); + + // Message + nano::asc_pull_req message (error, stream, header); + ASSERT_FALSE (error); + ASSERT_EQ (original.id, message.id); + ASSERT_EQ (original.type, message.type); + + nano::asc_pull_req::account_info_payload message_payload; + ASSERT_NO_THROW (message_payload = std::get (message.payload)); + ASSERT_EQ (original_payload.target, message_payload.target); + + ASSERT_TRUE (nano::at_end (stream)); +} + +TEST (message, asc_pull_ack_serialization_blocks) +{ + nano::asc_pull_ack original{ nano::dev::network_params.network }; + original.id = 11; + original.type = nano::asc_pull_type::blocks; + + nano::asc_pull_ack::blocks_payload original_payload; + // Generate blocks + const int num_blocks = 128; // Maximum allowed + for (int n = 0; n < num_blocks; ++n) + { + original_payload.blocks.push_back (random_block ()); } - ASSERT_EQ (keepalive.to_string (), expected); + original.payload = original_payload; + original.update_header (); + + // Serialize + std::vector bytes; + { + nano::vectorstream stream{ bytes }; + original.serialize (stream); + } + nano::bufferstream stream{ bytes.data (), bytes.size () }; + + // Header + bool error = false; + nano::message_header header (error, stream); + ASSERT_FALSE (error); + ASSERT_EQ (nano::message_type::asc_pull_ack, header.type); + + // Message + nano::asc_pull_ack message (error, stream, header); + ASSERT_FALSE (error); + ASSERT_EQ (original.id, message.id); + ASSERT_EQ (original.type, message.type); + + nano::asc_pull_ack::blocks_payload message_payload; + ASSERT_NO_THROW (message_payload = std::get (message.payload)); + + // Compare blocks + ASSERT_EQ (original_payload.blocks.size (), message_payload.blocks.size ()); + ASSERT_TRUE (std::equal (original_payload.blocks.begin (), original_payload.blocks.end (), message_payload.blocks.begin (), message_payload.blocks.end (), [] (auto a, auto b) { + return *a == *b; + })); + + ASSERT_TRUE (nano::at_end (stream)); } + +TEST (message, asc_pull_ack_serialization_account_info) +{ + nano::asc_pull_ack original{ nano::dev::network_params.network }; + original.id = 11; + original.type = nano::asc_pull_type::account_info; + + nano::asc_pull_ack::account_info_payload original_payload; + original_payload.account = nano::test::random_account (); + original_payload.account_open = nano::test::random_hash (); + original_payload.account_head = nano::test::random_hash (); + original_payload.account_block_count = 932932132; + original_payload.account_conf_frontier = nano::test::random_hash (); + original_payload.account_conf_height = 847312; + + original.payload = original_payload; + original.update_header (); + + // Serialize + std::vector bytes; + { + nano::vectorstream stream{ bytes }; + original.serialize (stream); + } + nano::bufferstream stream{ bytes.data (), bytes.size () }; + + // Header + bool error = false; + nano::message_header header (error, stream); + ASSERT_FALSE (error); + ASSERT_EQ (nano::message_type::asc_pull_ack, header.type); + + // Message + nano::asc_pull_ack message (error, stream, header); + ASSERT_FALSE (error); + ASSERT_EQ (original.id, message.id); + ASSERT_EQ (original.type, message.type); + + nano::asc_pull_ack::account_info_payload message_payload; + ASSERT_NO_THROW (message_payload = std::get (message.payload)); + + ASSERT_EQ (original_payload.account, message_payload.account); + ASSERT_EQ (original_payload.account_open, message_payload.account_open); + ASSERT_EQ (original_payload.account_head, message_payload.account_head); + ASSERT_EQ (original_payload.account_block_count, message_payload.account_block_count); + ASSERT_EQ (original_payload.account_conf_frontier, message_payload.account_conf_frontier); + ASSERT_EQ (original_payload.account_conf_height, message_payload.account_conf_height); + + ASSERT_TRUE (nano::at_end (stream)); +} \ No newline at end of file diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index cefd96ad..5cfaf03f 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -1144,7 +1144,7 @@ TEST (network, bandwidth_limiter) ASSERT_TIMELY (1s, 1 == node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out)); // change the bandwidth settings, 2 packets will be dropped - node.network.set_bandwidth_params (1.1, message_size * 2); + node.set_bandwidth_params (message_size * 2, 1.1); channel1->send (message); channel2->send (message); channel1->send (message); @@ -1152,7 +1152,7 @@ TEST (network, bandwidth_limiter) ASSERT_TIMELY (1s, 3 == node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out)); // change the bandwidth settings, no packet will be dropped - node.network.set_bandwidth_params (4, message_size); + node.set_bandwidth_params (message_size, 4); channel1->send (message); channel2->send (message); channel1->send (message); diff --git a/nano/core_test/toml.cpp b/nano/core_test/toml.cpp index bef1a262..54de3507 100644 --- a/nano/core_test/toml.cpp +++ b/nano/core_test/toml.cpp @@ -152,6 +152,8 @@ TEST (toml, daemon_config_deserialize_defaults) ASSERT_EQ (conf.node.backup_before_upgrade, defaults.node.backup_before_upgrade); ASSERT_EQ (conf.node.bandwidth_limit, defaults.node.bandwidth_limit); ASSERT_EQ (conf.node.bandwidth_limit_burst_ratio, defaults.node.bandwidth_limit_burst_ratio); + ASSERT_EQ (conf.node.bootstrap_bandwidth_limit, defaults.node.bootstrap_bandwidth_limit); + ASSERT_EQ (conf.node.bootstrap_bandwidth_burst_ratio, defaults.node.bootstrap_bandwidth_burst_ratio); ASSERT_EQ (conf.node.block_processor_batch_max_time, defaults.node.block_processor_batch_max_time); ASSERT_EQ (conf.node.bootstrap_connections, defaults.node.bootstrap_connections); ASSERT_EQ (conf.node.bootstrap_connections_max, defaults.node.bootstrap_connections_max); @@ -392,6 +394,8 @@ TEST (toml, daemon_config_deserialize_no_defaults) backup_before_upgrade = true bandwidth_limit = 999 bandwidth_limit_burst_ratio = 999.9 + bootstrap_bandwidth_limit = 999 + bootstrap_bandwidth_burst_ratio = 999.9 block_processor_batch_max_time = 999 bootstrap_connections = 999 bootstrap_connections_max = 999 @@ -557,6 +561,8 @@ TEST (toml, daemon_config_deserialize_no_defaults) ASSERT_NE (conf.node.backup_before_upgrade, defaults.node.backup_before_upgrade); ASSERT_NE (conf.node.bandwidth_limit, defaults.node.bandwidth_limit); ASSERT_NE (conf.node.bandwidth_limit_burst_ratio, defaults.node.bandwidth_limit_burst_ratio); + ASSERT_NE (conf.node.bootstrap_bandwidth_limit, defaults.node.bootstrap_bandwidth_limit); + ASSERT_NE (conf.node.bootstrap_bandwidth_burst_ratio, defaults.node.bootstrap_bandwidth_burst_ratio); ASSERT_NE (conf.node.block_processor_batch_max_time, defaults.node.block_processor_batch_max_time); ASSERT_NE (conf.node.bootstrap_connections, defaults.node.bootstrap_connections); ASSERT_NE (conf.node.bootstrap_connections_max, defaults.node.bootstrap_connections_max); diff --git a/nano/lib/blocks.cpp b/nano/lib/blocks.cpp index 28996148..51734fc1 100644 --- a/nano/lib/blocks.cpp +++ b/nano/lib/blocks.cpp @@ -1362,6 +1362,17 @@ std::shared_ptr nano::deserialize_block_json (boost::property_tree: return result; } +void nano::serialize_block_type (nano::stream & stream, const nano::block_type & type) +{ + nano::write (stream, type); +} + +void nano::serialize_block (nano::stream & stream_a, nano::block const & block_a) +{ + nano::serialize_block_type (stream_a, block_a.type ()); + block_a.serialize (stream_a); +} + std::shared_ptr nano::deserialize_block (nano::stream & stream_a) { nano::block_type type; @@ -1404,6 +1415,11 @@ std::shared_ptr nano::deserialize_block (nano::stream & stream_a, n result = ::deserialize_block (stream_a); break; } + case nano::block_type::not_a_block: + { + // Skip null block terminators + return {}; + } default: #ifndef NANO_FUZZER_TEST debug_assert (false); diff --git a/nano/lib/blocks.hpp b/nano/lib/blocks.hpp index 3d579627..97770f01 100644 --- a/nano/lib/blocks.hpp +++ b/nano/lib/blocks.hpp @@ -417,6 +417,14 @@ std::unique_ptr collect_container_info (block_uniquer std::shared_ptr deserialize_block (nano::stream &); std::shared_ptr deserialize_block (nano::stream &, nano::block_type, nano::block_uniquer * = nullptr); std::shared_ptr deserialize_block_json (boost::property_tree::ptree const &, nano::block_uniquer * = nullptr); +/** + * Serialize block type as an 8-bit value + */ +void serialize_block_type (nano::stream &, nano::block_type const &); +/** + * Serialize a block prefixed with an 8-bit typecode + */ void serialize_block (nano::stream &, nano::block const &); + void block_memory_pool_purge (); } diff --git a/nano/lib/processing_queue.hpp b/nano/lib/processing_queue.hpp index c5ffdb37..0fce61ed 100644 --- a/nano/lib/processing_queue.hpp +++ b/nano/lib/processing_queue.hpp @@ -69,12 +69,12 @@ public: /** * Queues item for batch processing */ - void add (T const & item) + void add (T && item) { nano::unique_lock lock{ mutex }; if (queue.size () < max_queue_size) { - queue.emplace_back (item); + queue.emplace_back (std::forward (item)); lock.unlock (); condition.notify_one (); stats.inc (stat_type, nano::stat::detail::queue); @@ -131,7 +131,7 @@ private: for (int n = 0; n < max_batch_size; ++n) { debug_assert (!queue.empty ()); - queue_l.emplace_back (queue.front ()); + queue_l.emplace_back (std::move (queue.front ())); queue.pop_front (); } return queue_l; diff --git a/nano/lib/stats.cpp b/nano/lib/stats.cpp index cd089cdd..c7e1bfce 100644 --- a/nano/lib/stats.cpp +++ b/nano/lib/stats.cpp @@ -551,6 +551,15 @@ std::string nano::stat::type_to_string (stat::type type) case nano::stat::type::blockprocessor: res = "blockprocessor"; break; + case nano::stat::type::bootstrap_server: + res = "bootstrap_server"; + break; + case nano::stat::type::bootstrap_server_requests: + res = "bootstrap_server_requests"; + break; + case nano::stat::type::bootstrap_server_responses: + res = "bootstrap_server_responses"; + break; } return res; } @@ -731,6 +740,12 @@ std::string nano::stat::detail_to_string (stat::detail detail) case nano::stat::detail::telemetry_ack: res = "telemetry_ack"; break; + case nano::stat::detail::asc_pull_req: + res = "asc_pull_req"; + break; + case nano::stat::detail::asc_pull_ack: + res = "asc_pull_ack"; + break; case nano::stat::detail::state_block: res = "state_block"; break; @@ -889,6 +904,12 @@ std::string nano::stat::detail_to_string (stat::detail detail) case nano::stat::detail::invalid_frontier_req_message: res = "invalid_frontier_req_message"; break; + case nano::stat::detail::invalid_asc_pull_req_message: + res = "invalid_asc_pull_req_message"; + break; + case nano::stat::detail::invalid_asc_pull_ack_message: + res = "invalid_asc_pull_ack_message"; + break; case nano::stat::detail::message_too_big: res = "message_too_big"; break; @@ -988,6 +1009,33 @@ std::string nano::stat::detail_to_string (stat::detail detail) case nano::stat::detail::missing_block: res = "missing_block"; break; + case nano::stat::detail::response: + res = "response"; + break; + case nano::stat::detail::write_drop: + res = "write_drop"; + break; + case nano::stat::detail::write_error: + res = "write_error"; + break; + case nano::stat::detail::blocks: + res = "blocks"; + break; + case nano::stat::detail::drop: + res = "drop"; + break; + case nano::stat::detail::bad_count: + res = "bad_count"; + break; + case nano::stat::detail::response_blocks: + res = "response_blocks"; + break; + case nano::stat::detail::response_account_info: + res = "response_account_info"; + break; + case nano::stat::detail::channel_full: + res = "channel_full"; + break; } return res; } diff --git a/nano/lib/stats.hpp b/nano/lib/stats.hpp index 565315e8..aa462a1e 100644 --- a/nano/lib/stats.hpp +++ b/nano/lib/stats.hpp @@ -247,6 +247,9 @@ public: vote_cache, hinting, blockprocessor, + bootstrap_server, + bootstrap_server_requests, + bootstrap_server_responses, }; /** Optional detail type */ @@ -304,6 +307,8 @@ public: node_id_handshake, telemetry_req, telemetry_ack, + asc_pull_req, + asc_pull_ack, // bootstrap, callback initiate, @@ -369,6 +374,8 @@ public: invalid_bulk_pull_message, invalid_bulk_pull_account_message, invalid_frontier_req_message, + invalid_asc_pull_req_message, + invalid_asc_pull_ack_message, message_too_big, outdated_version, udp_max_per_ip, @@ -435,6 +442,17 @@ public: hinted, insert_failed, missing_block, + + // bootstrap server + response, + write_drop, + write_error, + blocks, + drop, + bad_count, + response_blocks, + response_account_info, + channel_full, }; /** Direction of the stat. If the direction is irrelevant, use in */ diff --git a/nano/lib/stream.hpp b/nano/lib/stream.hpp index 8fe0be5b..77bb8618 100644 --- a/nano/lib/stream.hpp +++ b/nano/lib/stream.hpp @@ -2,12 +2,15 @@ #include +#include + #include namespace nano { // We operate on streams of uint8_t by convention using stream = std::basic_streambuf; + // Read a raw byte stream the size of `T' and fill value. Returns true if there was an error, false otherwise template bool try_read (nano::stream & stream_a, T & value_a) @@ -16,6 +19,7 @@ bool try_read (nano::stream & stream_a, T & value_a) auto amount_read (stream_a.sgetn (reinterpret_cast (&value_a), sizeof (value_a))); return amount_read != sizeof (value_a); } + // A wrapper of try_read which throws if there is an error template void read (nano::stream & stream_a, T & value) @@ -51,4 +55,28 @@ inline void write (nano::stream & stream_a, std::vector const & value_a (void)amount_written; debug_assert (amount_written == value_a.size ()); } + +inline bool at_end (nano::stream & stream) +{ + uint8_t junk; + auto end (nano::try_read (stream, junk)); + return end; +} + +/* + * We use big endian as standard for all network communications + */ +template +void write_big_endian (nano::stream & stream, T const & value) +{ + nano::write (stream, boost::endian::native_to_big (value)); +} + +template +void read_big_endian (nano::stream & stream, T & value) +{ + T tmp; + nano::read (stream, tmp); + value = boost::endian::big_to_native (tmp); +} } diff --git a/nano/lib/threading.cpp b/nano/lib/threading.cpp index ff6b74c2..db004041 100644 --- a/nano/lib/threading.cpp +++ b/nano/lib/threading.cpp @@ -102,6 +102,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::vote_generator_queue: thread_role_name_string = "Voting que"; break; + case nano::thread_role::name::bootstrap_server: + thread_role_name_string = "Bootstrp serv"; + break; default: debug_assert (false && "nano::thread_role::get_string unhandled thread role"); } @@ -322,4 +325,17 @@ unsigned int nano::hardware_concurrency () return std::thread::hardware_concurrency (); } return value; +} + +bool nano::join_or_pass (std::thread & thread) +{ + if (thread.joinable ()) + { + thread.join (); + return true; + } + else + { + return false; + } } \ No newline at end of file diff --git a/nano/lib/threading.hpp b/nano/lib/threading.hpp index 28f3c408..20b6c7c2 100644 --- a/nano/lib/threading.hpp +++ b/nano/lib/threading.hpp @@ -46,6 +46,7 @@ namespace thread_role backlog_population, election_hinting, vote_generator_queue, + bootstrap_server, }; /* @@ -203,8 +204,13 @@ private: std::unique_ptr collect_container_info (thread_pool & thread_pool, std::string const & name); -/* +/** * Number of available logical processor cores. Might be overridden by setting `NANO_HARDWARE_CONCURRENCY` environment variable */ unsigned int hardware_concurrency (); + +/** + * If thread is joinable joins it, otherwise does nothing + */ +bool join_or_pass (std::thread &); } diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 04292044..4202a0ac 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -18,6 +18,8 @@ add_library( active_transactions.cpp backlog_population.hpp backlog_population.cpp + bandwidth_limiter.hpp + bandwidth_limiter.cpp block_arrival.hpp block_arrival.cpp blockprocessor.hpp @@ -40,6 +42,8 @@ add_library( bootstrap/bootstrap_legacy.cpp bootstrap/bootstrap.hpp bootstrap/bootstrap.cpp + bootstrap/bootstrap_server.hpp + bootstrap/bootstrap_server.cpp cli.hpp cli.cpp common.hpp diff --git a/nano/node/bandwidth_limiter.cpp b/nano/node/bandwidth_limiter.cpp new file mode 100644 index 00000000..581a9e56 --- /dev/null +++ b/nano/node/bandwidth_limiter.cpp @@ -0,0 +1,56 @@ +#include +#include + +/* + * bandwidth_limiter + */ + +nano::bandwidth_limiter::bandwidth_limiter (std::size_t limit_a, double burst_ratio_a) : + bucket (static_cast (limit_a * burst_ratio_a), limit_a) +{ +} + +bool nano::bandwidth_limiter::should_pass (std::size_t message_size_a) +{ + return bucket.try_consume (nano::narrow_cast (message_size_a)); +} + +void nano::bandwidth_limiter::reset (std::size_t limit_a, double burst_ratio_a) +{ + bucket.reset (static_cast (limit_a * burst_ratio_a), limit_a); +} + +/* + * outbound_bandwidth_limiter + */ + +nano::outbound_bandwidth_limiter::outbound_bandwidth_limiter (nano::outbound_bandwidth_limiter::config config_a) : + config_m{ config_a }, + limiter_standard (config_m.standard_limit, config_m.standard_burst_ratio), + limiter_bootstrap{ config_m.bootstrap_limit, config_m.bootstrap_burst_ratio } +{ +} + +nano::bandwidth_limiter & nano::outbound_bandwidth_limiter::select_limiter (nano::bandwidth_limit_type type) +{ + switch (type) + { + case bandwidth_limit_type::bootstrap: + return limiter_bootstrap; + default: + return limiter_standard; + } + debug_assert (false); +} + +bool nano::outbound_bandwidth_limiter::should_pass (std::size_t buffer_size, nano::bandwidth_limit_type type) +{ + auto & limiter = select_limiter (type); + return limiter.should_pass (buffer_size); +} + +void nano::outbound_bandwidth_limiter::reset (std::size_t limit, double burst_ratio, nano::bandwidth_limit_type type) +{ + auto & limiter = select_limiter (type); + limiter.reset (limit, burst_ratio); +} \ No newline at end of file diff --git a/nano/node/bandwidth_limiter.hpp b/nano/node/bandwidth_limiter.hpp new file mode 100644 index 00000000..5a7f1f76 --- /dev/null +++ b/nano/node/bandwidth_limiter.hpp @@ -0,0 +1,73 @@ +#pragma once + +#include + +namespace nano +{ +/** + * Enumeration for different bandwidth limits for different traffic types + */ +enum class bandwidth_limit_type +{ + /** For all message */ + standard, + /** For bootstrap (asc_pull_ack, asc_pull_req) traffic */ + bootstrap +}; + +/** + * Class that tracks and manages bandwidth limits for IO operations + */ +class bandwidth_limiter final +{ +public: + // initialize with limit 0 = unbounded + bandwidth_limiter (std::size_t limit, double burst_ratio); + + bool should_pass (std::size_t buffer_size); + void reset (std::size_t limit, double burst_ratio); + +private: + nano::rate::token_bucket bucket; +}; + +class outbound_bandwidth_limiter final +{ +public: // Config + struct config + { + // standard + std::size_t standard_limit; + double standard_burst_ratio; + // bootstrap + std::size_t bootstrap_limit; + double bootstrap_burst_ratio; + }; + +public: + explicit outbound_bandwidth_limiter (config); + + /** + * Check whether packet falls withing bandwidth limits and should be allowed + * @return true if OK, false if needs to be dropped + */ + bool should_pass (std::size_t buffer_size, bandwidth_limit_type); + /** + * Reset limits of selected limiter type to values passed in arguments + */ + void reset (std::size_t limit, double burst_ratio, bandwidth_limit_type = bandwidth_limit_type::standard); + +private: + /** + * Returns reference to limiter corresponding to the limit type + */ + bandwidth_limiter & select_limiter (bandwidth_limit_type); + +private: + const config config_m; + +private: + bandwidth_limiter limiter_standard; + bandwidth_limiter limiter_bootstrap; +}; +} \ No newline at end of file diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp new file mode 100644 index 00000000..2358c51a --- /dev/null +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -0,0 +1,291 @@ +#include +#include +#include +#include + +#include + +// TODO: Make threads configurable +nano::bootstrap_server::bootstrap_server (nano::store & store_a, nano::ledger & ledger_a, nano::network_constants const & network_constants_a, nano::stat & stats_a) : + store{ store_a }, + ledger{ ledger_a }, + network_constants{ network_constants_a }, + stats{ stats_a }, + request_queue{ stats, nano::stat::type::bootstrap_server, nano::thread_role::name::bootstrap_server, /* threads */ 1, /* max size */ 1024 * 16, /* max batch */ 128 } +{ + request_queue.process_batch = [this] (auto & batch) { + process_batch (batch); + }; +} + +nano::bootstrap_server::~bootstrap_server () +{ + stop (); +} + +void nano::bootstrap_server::start () +{ + request_queue.start (); +} + +void nano::bootstrap_server::stop () +{ + request_queue.stop (); +} + +bool nano::bootstrap_server::verify_request_type (nano::asc_pull_type type) const +{ + switch (type) + { + case asc_pull_type::invalid: + return false; + case asc_pull_type::blocks: + case asc_pull_type::account_info: + return true; + } + return false; +} + +bool nano::bootstrap_server::verify (const nano::asc_pull_req & message) const +{ + if (!verify_request_type (message.type)) + { + return false; + } + + struct verify_visitor + { + bool operator() (nano::empty_payload const &) const + { + return false; + } + bool operator() (nano::asc_pull_req::blocks_payload const & pld) const + { + return pld.count > 0 && pld.count <= max_blocks; + } + bool operator() (nano::asc_pull_req::account_info_payload const & pld) const + { + return !pld.target.is_zero (); + } + }; + + return std::visit (verify_visitor{}, message.payload); +} + +bool nano::bootstrap_server::request (nano::asc_pull_req const & message, std::shared_ptr channel) +{ + if (!verify (message)) + { + stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::invalid); + return false; + } + + // If channel is full our response will be dropped anyway, so filter that early + // TODO: Add per channel limits (this ideally should be done on the channel message processing side) + if (channel->max ()) + { + stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::channel_full, nano::stat::dir::in); + return false; + } + + request_queue.add (std::make_pair (message, channel)); + return true; +} + +void nano::bootstrap_server::respond (nano::asc_pull_ack & response, std::shared_ptr & channel) +{ + stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::response, nano::stat::dir::out); + + // Increase relevant stats depending on payload type + struct stat_visitor + { + nano::stat & stats; + + void operator() (nano::empty_payload const &) + { + debug_assert (false, "missing payload"); + } + void operator() (nano::asc_pull_ack::blocks_payload const & pld) + { + stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::response_blocks, nano::stat::dir::out); + stats.add (nano::stat::type::bootstrap_server, nano::stat::detail::blocks, nano::stat::dir::out, pld.blocks.size ()); + } + void operator() (nano::asc_pull_ack::account_info_payload const & pld) + { + stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::response_account_info, nano::stat::dir::out); + } + }; + std::visit (stat_visitor{ stats }, response.payload); + + on_response.notify (response, channel); + + channel->send ( + response, [this] (auto & ec, auto size) { + if (ec) + { + stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::write_error, nano::stat::dir::out); + } + }, + nano::buffer_drop_policy::limiter, nano::bandwidth_limit_type::bootstrap); +} + +/* + * Requests + */ + +void nano::bootstrap_server::process_batch (std::deque & batch) +{ + auto transaction = store.tx_begin_read (); + + for (auto & [request, channel] : batch) + { + if (!channel->max ()) + { + auto response = process (transaction, request); + respond (response, channel); + } + else + { + stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::channel_full, nano::stat::dir::out); + } + } +} + +nano::asc_pull_ack nano::bootstrap_server::process (nano::transaction const & transaction, const nano::asc_pull_req & message) +{ + return std::visit ([this, &transaction, &message] (auto && request) { return process (transaction, message.id, request); }, message.payload); +} + +nano::asc_pull_ack nano::bootstrap_server::process (const nano::transaction &, nano::asc_pull_req::id_t id, const nano::empty_payload & request) +{ + // Empty payload should never be possible, but return empty response anyway + debug_assert (false, "missing payload"); + nano::asc_pull_ack response{ network_constants }; + response.id = id; + response.type = nano::asc_pull_type::invalid; + return response; +} + +/* + * Blocks response + */ + +nano::asc_pull_ack nano::bootstrap_server::process (nano::transaction const & transaction, nano::asc_pull_req::id_t id, nano::asc_pull_req::blocks_payload const & request) +{ + const std::size_t count = std::min (static_cast (request.count), max_blocks); + + // `start` can represent either account or block hash + if (store.block.exists (transaction, request.start.as_block_hash ())) + { + return prepare_response (transaction, id, request.start.as_block_hash (), count); + } + if (store.account.exists (transaction, request.start.as_account ())) + { + auto info = store.account.get (transaction, request.start.as_account ()); + if (info) + { + // Start from open block if pulling by account + return prepare_response (transaction, id, info->open_block, count); + } + else + { + debug_assert (false, "account exists but cannot be retrieved"); + } + } + + // Neither block nor account found, send empty response to indicate that + return prepare_empty_blocks_response (id); +} + +nano::asc_pull_ack nano::bootstrap_server::prepare_response (nano::transaction const & transaction, nano::asc_pull_req::id_t id, nano::block_hash start_block, std::size_t count) +{ + debug_assert (count <= max_blocks); + + auto blocks = prepare_blocks (transaction, start_block, count); + debug_assert (blocks.size () <= count); + + nano::asc_pull_ack response{ network_constants }; + response.id = id; + response.type = nano::asc_pull_type::blocks; + + nano::asc_pull_ack::blocks_payload response_payload; + response_payload.blocks = blocks; + response.payload = response_payload; + + response.update_header (); + return response; +} + +nano::asc_pull_ack nano::bootstrap_server::prepare_empty_blocks_response (nano::asc_pull_req::id_t id) +{ + nano::asc_pull_ack response{ network_constants }; + response.id = id; + response.type = nano::asc_pull_type::blocks; + + nano::asc_pull_ack::blocks_payload empty_payload{}; + response.payload = empty_payload; + + response.update_header (); + return response; +} + +std::vector> nano::bootstrap_server::prepare_blocks (nano::transaction const & transaction, nano::block_hash start_block, std::size_t count) const +{ + debug_assert (count <= max_blocks); + + std::vector> result; + if (!start_block.is_zero ()) + { + std::shared_ptr current = store.block.get (transaction, start_block); + while (current && result.size () < count) + { + result.push_back (current); + + auto successor = current->sideband ().successor; + current = store.block.get (transaction, successor); + } + } + return result; +} + +/* + * Account info response + */ + +nano::asc_pull_ack nano::bootstrap_server::process (const nano::transaction & transaction, nano::asc_pull_req::id_t id, const nano::asc_pull_req::account_info_payload & request) +{ + nano::asc_pull_ack response{ network_constants }; + response.id = id; + response.type = nano::asc_pull_type::account_info; + + auto target = request.target.as_account (); + // Try to lookup account assuming target is block hash + if (auto account_from_hash = ledger.account_safe (transaction, request.target.as_block_hash ()); !account_from_hash.is_zero ()) + { + target = account_from_hash; + } + // Otherwise assume target is an actual account + + nano::asc_pull_ack::account_info_payload response_payload{}; + response_payload.account = target; + + auto account_info = store.account.get (transaction, target); + if (account_info) + { + response_payload.account_open = account_info->open_block; + response_payload.account_head = account_info->head; + response_payload.account_block_count = account_info->block_count; + + auto conf_info = store.confirmation_height.get (transaction, target); + if (conf_info) + { + response_payload.account_conf_frontier = conf_info->frontier; + response_payload.account_conf_height = conf_info->height; + } + } + // If account is missing the response payload will contain all 0 fields, except for the target + + response.payload = response_payload; + response.update_header (); + return response; +} \ No newline at end of file diff --git a/nano/node/bootstrap/bootstrap_server.hpp b/nano/node/bootstrap/bootstrap_server.hpp new file mode 100644 index 00000000..2a111511 --- /dev/null +++ b/nano/node/bootstrap/bootstrap_server.hpp @@ -0,0 +1,87 @@ +#pragma once + +#include +#include +#include + +#include +#include +#include + +namespace nano +{ +class ledger; +namespace transport +{ + class channel; +} + +/** + * Processes bootstrap requests (`asc_pull_req` messages) and replies with bootstrap responses (`asc_pull_ack`) + * + * In order to ensure maximum throughput, there are two internal processing queues: + * - One for doing ledger lookups and preparing responses (`request_queue`) + * - One for sending back those responses over the network (`response_queue`) + */ +class bootstrap_server final +{ +public: + // `asc_pull_req` message is small, store by value + using request_t = std::pair>; // + +public: + bootstrap_server (nano::store &, nano::ledger &, nano::network_constants const &, nano::stat &); + ~bootstrap_server (); + + void start (); + void stop (); + + /** + * Process `asc_pull_req` message coming from network. + * Reply will be sent back over passed in `channel` + */ + bool request (nano::asc_pull_req const & message, std::shared_ptr channel); + +public: // Events + nano::observer_set &> on_response; + +private: + void process_batch (std::deque & batch); + nano::asc_pull_ack process (nano::transaction const &, nano::asc_pull_req const & message); + void respond (nano::asc_pull_ack &, std::shared_ptr &); + + nano::asc_pull_ack process (nano::transaction const &, nano::asc_pull_req::id_t id, nano::empty_payload const & request); + + /* + * Blocks response + */ + nano::asc_pull_ack process (nano::transaction const &, nano::asc_pull_req::id_t id, nano::asc_pull_req::blocks_payload const & request); + nano::asc_pull_ack prepare_response (nano::transaction const &, nano::asc_pull_req::id_t id, nano::block_hash start_block, std::size_t count); + nano::asc_pull_ack prepare_empty_blocks_response (nano::asc_pull_req::id_t id); + std::vector> prepare_blocks (nano::transaction const &, nano::block_hash start_block, std::size_t count) const; + + /* + * Account info response + */ + nano::asc_pull_ack process (nano::transaction const &, nano::asc_pull_req::id_t id, nano::asc_pull_req::account_info_payload const & request); + + /* + * Checks if the request should be dropped early on + */ + bool verify (nano::asc_pull_req const & message) const; + bool verify_request_type (nano::asc_pull_type) const; + +private: // Dependencies + nano::store & store; + nano::ledger & ledger; + nano::network_constants const & network_constants; + nano::stat & stats; + +private: + processing_queue request_queue; + +public: // Config + /** Maximum number of blocks to send in a single response, cannot be higher than capacity of a single `asc_pull_ack` message */ + constexpr static std::size_t max_blocks = nano::asc_pull_ack::blocks_payload::max_blocks; +}; +} \ No newline at end of file diff --git a/nano/node/messages.cpp b/nano/node/messages.cpp index fff5cb73..f6048f0f 100644 --- a/nano/node/messages.cpp +++ b/nano/node/messages.cpp @@ -72,9 +72,9 @@ bool nano::message_header::deserialize (nano::stream & stream_a) return error; } -std::string nano::to_string (nano::message_type message_type_l) +std::string nano::to_string (nano::message_type type) { - switch (message_type_l) + switch (type) { case nano::message_type::invalid: return "invalid"; @@ -102,15 +102,19 @@ std::string nano::to_string (nano::message_type message_type_l) return "telemetry_req"; case nano::message_type::telemetry_ack: return "telemetry_ack"; + case nano::message_type::asc_pull_req: + return "asc_pull_req"; + case nano::message_type::asc_pull_ack: + return "asc_pull_ack"; // default case intentionally omitted to cause warnings for unhandled enums } return "n/a"; } -nano::stat::detail nano::to_stat_detail (nano::message_type message_type) +nano::stat::detail nano::to_stat_detail (nano::message_type type) { - switch (message_type) + switch (type) { case nano::message_type::invalid: return nano::stat::detail::invalid; @@ -138,6 +142,10 @@ nano::stat::detail nano::to_stat_detail (nano::message_type message_type) return nano::stat::detail::telemetry_req; case nano::message_type::telemetry_ack: return nano::stat::detail::telemetry_ack; + case nano::message_type::asc_pull_req: + return nano::stat::detail::asc_pull_req; + case nano::message_type::asc_pull_ack: + return nano::stat::detail::asc_pull_ack; // default case intentionally omitted to cause warnings for unhandled enums } debug_assert (false); @@ -304,6 +312,14 @@ std::size_t nano::message_header::payload_length_bytes () const { return nano::telemetry_ack::size (*this); } + case nano::message_type::asc_pull_req: + { + return nano::asc_pull_req::size (*this); + } + case nano::message_type::asc_pull_ack: + { + return nano::asc_pull_ack::size (*this); + } default: { debug_assert (false); @@ -327,6 +343,8 @@ bool nano::message_header::is_valid_message_type () const case nano::message_type::confirm_req: case nano::message_type::node_id_handshake: case nano::message_type::telemetry_ack: + case nano::message_type::asc_pull_req: + case nano::message_type::asc_pull_ack: { return true; } @@ -364,6 +382,11 @@ nano::shared_const_buffer nano::message::to_shared_const_buffer () const return shared_const_buffer (to_bytes ()); } +nano::message_type nano::message::type () const +{ + return header.type; +} + /* * message_parser */ @@ -1596,4 +1619,321 @@ std::size_t nano::node_id_handshake::size (nano::message_header const & header_a result += sizeof (nano::account) + sizeof (nano::signature); } return result; +} + +/* + * asc_pull_req + */ + +nano::asc_pull_req::asc_pull_req (const nano::network_constants & constants) : + message (constants, nano::message_type::asc_pull_req) +{ +} + +nano::asc_pull_req::asc_pull_req (bool & error, nano::stream & stream, const nano::message_header & header) : + message (header) +{ + error = deserialize (stream); +} + +void nano::asc_pull_req::visit (nano::message_visitor & visitor) const +{ + visitor.asc_pull_req (*this); +} + +void nano::asc_pull_req::serialize (nano::stream & stream) const +{ + header.serialize (stream); + nano::write (stream, type); + nano::write_big_endian (stream, id); + + serialize_payload (stream); +} + +bool nano::asc_pull_req::deserialize (nano::stream & stream) +{ + debug_assert (header.type == nano::message_type::asc_pull_req); + bool error = false; + try + { + nano::read (stream, type); + nano::read_big_endian (stream, id); + + deserialize_payload (stream); + } + catch (std::runtime_error const &) + { + error = true; + } + return error; +} + +void nano::asc_pull_req::serialize_payload (nano::stream & stream) const +{ + debug_assert (verify_consistency ()); + + std::visit ([&stream] (auto && pld) { pld.serialize (stream); }, payload); +} + +void nano::asc_pull_req::deserialize_payload (nano::stream & stream) +{ + switch (type) + { + case asc_pull_type::blocks: + { + blocks_payload pld; + pld.deserialize (stream); + payload = pld; + break; + } + case asc_pull_type::account_info: + { + account_info_payload pld; + pld.deserialize (stream); + payload = pld; + break; + } + default: + throw std::runtime_error ("Unknown asc_pull_type"); + } +} + +void nano::asc_pull_req::update_header () +{ + std::vector bytes; + { + nano::vectorstream payload_stream (bytes); + serialize_payload (payload_stream); + } + debug_assert (bytes.size () <= 65535u); // Max int16 for storing size + debug_assert (bytes.size () >= 1); + header.extensions = std::bitset<16> (bytes.size ()); +} + +std::size_t nano::asc_pull_req::size (const nano::message_header & header) +{ + uint16_t payload_length = nano::narrow_cast (header.extensions.to_ulong ()); + return partial_size + payload_length; +} + +bool nano::asc_pull_req::verify_consistency () const +{ + struct consistency_visitor + { + nano::asc_pull_type type; + + void operator() (empty_payload) const + { + debug_assert (false, "missing payload"); + } + void operator() (blocks_payload) const + { + debug_assert (type == asc_pull_type::blocks); + } + void operator() (account_info_payload) const + { + debug_assert (type == asc_pull_type::account_info); + } + }; + std::visit (consistency_visitor{ type }, payload); + return true; // Just for convenience of calling from asserts +} + +/* + * asc_pull_req::blocks_payload + */ + +void nano::asc_pull_req::blocks_payload::serialize (nano::stream & stream) const +{ + nano::write (stream, start); + nano::write (stream, count); +} + +void nano::asc_pull_req::blocks_payload::deserialize (nano::stream & stream) +{ + nano::read (stream, start); + nano::read (stream, count); +} + +/* + * asc_pull_req::account_info_payload + */ + +void nano::asc_pull_req::account_info_payload::serialize (stream & stream) const +{ + nano::write (stream, target); +} + +void nano::asc_pull_req::account_info_payload::deserialize (stream & stream) +{ + nano::read (stream, target); +} + +/* + * asc_pull_ack + */ + +nano::asc_pull_ack::asc_pull_ack (const nano::network_constants & constants) : + message (constants, nano::message_type::asc_pull_ack) +{ +} + +nano::asc_pull_ack::asc_pull_ack (bool & error, nano::stream & stream, const nano::message_header & header) : + message (header) +{ + error = deserialize (stream); +} + +void nano::asc_pull_ack::visit (nano::message_visitor & visitor) const +{ + visitor.asc_pull_ack (*this); +} + +void nano::asc_pull_ack::serialize (nano::stream & stream) const +{ + debug_assert (header.extensions.to_ulong () > 0); // Block payload must have least `not_a_block` terminator + header.serialize (stream); + nano::write (stream, type); + nano::write_big_endian (stream, id); + + serialize_payload (stream); +} + +bool nano::asc_pull_ack::deserialize (nano::stream & stream) +{ + debug_assert (header.type == nano::message_type::asc_pull_ack); + bool error = false; + try + { + nano::read (stream, type); + nano::read_big_endian (stream, id); + + deserialize_payload (stream); + } + catch (std::runtime_error const &) + { + error = true; + } + return error; +} + +void nano::asc_pull_ack::serialize_payload (nano::stream & stream) const +{ + debug_assert (verify_consistency ()); + + std::visit ([&stream] (auto && pld) { pld.serialize (stream); }, payload); +} + +void nano::asc_pull_ack::deserialize_payload (nano::stream & stream) +{ + switch (type) + { + case asc_pull_type::blocks: + { + blocks_payload pld; + pld.deserialize (stream); + payload = pld; + break; + } + case asc_pull_type::account_info: + { + account_info_payload pld; + pld.deserialize (stream); + payload = pld; + break; + } + default: + throw std::runtime_error ("Unknown asc_pull_type"); + } +} + +void nano::asc_pull_ack::update_header () +{ + std::vector bytes; + { + nano::vectorstream payload_stream (bytes); + serialize_payload (payload_stream); + } + debug_assert (bytes.size () <= 65535u); // Max int16 for storing size + debug_assert (bytes.size () >= 1); + header.extensions = std::bitset<16> (bytes.size ()); +} + +std::size_t nano::asc_pull_ack::size (const nano::message_header & header) +{ + uint16_t payload_length = nano::narrow_cast (header.extensions.to_ulong ()); + return partial_size + payload_length; +} + +bool nano::asc_pull_ack::verify_consistency () const +{ + struct consistency_visitor + { + nano::asc_pull_type type; + + void operator() (empty_payload) const + { + debug_assert (false, "missing payload"); + } + void operator() (blocks_payload) const + { + debug_assert (type == asc_pull_type::blocks); + } + void operator() (account_info_payload) const + { + debug_assert (type == asc_pull_type::account_info); + } + }; + std::visit (consistency_visitor{ type }, payload); + return true; // Just for convenience of calling from asserts +} + +/* + * asc_pull_ack::blocks_payload + */ + +void nano::asc_pull_ack::blocks_payload::serialize (nano::stream & stream) const +{ + debug_assert (blocks.size () <= max_blocks); + for (auto & block : blocks) + { + debug_assert (block != nullptr); + nano::serialize_block (stream, *block); + } + // For convenience, end with null block terminator + nano::serialize_block_type (stream, nano::block_type::not_a_block); +} + +void nano::asc_pull_ack::blocks_payload::deserialize (nano::stream & stream) +{ + auto current = nano::deserialize_block (stream); + while (current && blocks.size () < max_blocks) + { + blocks.push_back (current); + current = nano::deserialize_block (stream); + } +} + +/* + * asc_pull_ack::account_info_payload + */ + +void nano::asc_pull_ack::account_info_payload::serialize (nano::stream & stream) const +{ + nano::write (stream, account); + nano::write (stream, account_open); + nano::write (stream, account_head); + nano::write_big_endian (stream, account_block_count); + nano::write (stream, account_conf_frontier); + nano::write_big_endian (stream, account_conf_height); +} + +void nano::asc_pull_ack::account_info_payload::deserialize (nano::stream & stream) +{ + nano::read (stream, account); + nano::read (stream, account_open); + nano::read (stream, account_head); + nano::read_big_endian (stream, account_block_count); + nano::read (stream, account_conf_frontier); + nano::read_big_endian (stream, account_conf_height); } \ No newline at end of file diff --git a/nano/node/messages.hpp b/nano/node/messages.hpp index f0a6cb32..86dd4b32 100644 --- a/nano/node/messages.hpp +++ b/nano/node/messages.hpp @@ -12,6 +12,7 @@ #include #include +#include namespace nano { @@ -34,7 +35,9 @@ enum class message_type : uint8_t node_id_handshake = 0x0a, bulk_pull_account = 0x0b, telemetry_req = 0x0c, - telemetry_ack = 0x0d + telemetry_ack = 0x0d, + asc_pull_req = 0x0e, + asc_pull_ack = 0x0f, }; std::string to_string (message_type); @@ -98,11 +101,15 @@ public: explicit message (nano::network_constants const &, nano::message_type); explicit message (nano::message_header const &); virtual ~message () = default; + virtual void serialize (nano::stream &) const = 0; virtual void visit (nano::message_visitor &) const = 0; std::shared_ptr> to_bytes () const; nano::shared_const_buffer to_shared_const_buffer () const; + nano::message_type type () const; + +public: nano::message_header header; }; @@ -349,6 +356,173 @@ public: static std::size_t size (nano::message_header const &); }; +/** + * Type of requested asc pull data + * - blocks: + * - account_info: + */ +enum class asc_pull_type : uint8_t +{ + invalid = 0x0, + blocks = 0x1, + account_info = 0x2, +}; + +class empty_payload +{ +public: + void serialize (nano::stream &) const + { + debug_assert (false); + } + void deserialize (nano::stream &) + { + debug_assert (false); + } +}; + +/** + * Ascending bootstrap pull request + */ +class asc_pull_req final : public message +{ +public: + using id_t = uint64_t; + + explicit asc_pull_req (nano::network_constants const &); + asc_pull_req (bool & error, nano::stream &, nano::message_header const &); + + void serialize (nano::stream &) const override; + bool deserialize (nano::stream &); + void visit (nano::message_visitor &) const override; + + static std::size_t size (nano::message_header const &); + + /** + * Update payload size stored in header + * IMPORTANT: Must be called after any update to the payload + */ + void update_header (); + + void serialize_payload (nano::stream &) const; + void deserialize_payload (nano::stream &); + +private: // Debug + /** + * Asserts that payload type is consistent with actual payload + */ + bool verify_consistency () const; + +public: // Payload definitions + class blocks_payload + { + public: + void serialize (nano::stream &) const; + void deserialize (nano::stream &); + + public: + nano::hash_or_account start{ 0 }; + uint8_t count{ 0 }; + }; + + class account_info_payload + { + public: + void serialize (nano::stream &) const; + void deserialize (nano::stream &); + + public: + nano::hash_or_account target{ 0 }; + }; + +public: // Payload + /** Currently unused, allows extensions in the future */ + asc_pull_type type{ asc_pull_type::invalid }; + id_t id{ 0 }; + + /** Payload depends on `asc_pull_type` */ + std::variant payload; + +public: + /** Size of message without payload */ + constexpr static std::size_t partial_size = sizeof (type) + sizeof (id); +}; + +/** + * Ascending bootstrap pull response + */ +class asc_pull_ack final : public message +{ +public: + using id_t = asc_pull_req::id_t; + + explicit asc_pull_ack (nano::network_constants const &); + asc_pull_ack (bool & error, nano::stream &, nano::message_header const &); + + void serialize (nano::stream &) const override; + bool deserialize (nano::stream &); + void visit (nano::message_visitor &) const override; + + static std::size_t size (nano::message_header const &); + + /** + * Update payload size stored in header + * IMPORTANT: Must be called after any update to the payload + */ + void update_header (); + + void serialize_payload (nano::stream &) const; + void deserialize_payload (nano::stream &); + +private: // Debug + /** + * Asserts that payload type is consistent with actual payload + */ + bool verify_consistency () const; + +public: // Payload definitions + class blocks_payload + { + public: + void serialize (nano::stream &) const; + void deserialize (nano::stream &); + + public: + std::vector> blocks{}; + + public: + /* Header allows for 16 bit extensions; 65535 bytes / 500 bytes (block size with some future margin) ~ 131 */ + constexpr static std::size_t max_blocks = 128; + }; + + class account_info_payload + { + public: + void serialize (nano::stream &) const; + void deserialize (nano::stream &); + + public: + nano::account account{ 0 }; + nano::block_hash account_open{ 0 }; + nano::block_hash account_head{ 0 }; + uint64_t account_block_count{ 0 }; + nano::block_hash account_conf_frontier{ 0 }; + uint64_t account_conf_height{ 0 }; + }; + +public: // Payload + /** Currently unused, allows extensions in the future */ + asc_pull_type type{ asc_pull_type::invalid }; + id_t id{ 0 }; + + /** Payload depends on `asc_pull_type` */ + std::variant payload; + +public: + /** Size of message without payload */ + constexpr static std::size_t partial_size = sizeof (type) + sizeof (id); +}; + class message_visitor { public: @@ -398,6 +572,14 @@ public: { default_handler (message); } + virtual void asc_pull_req (nano::asc_pull_req const & message) + { + default_handler (message); + } + virtual void asc_pull_ack (nano::asc_pull_ack const & message) + { + default_handler (message); + } virtual void default_handler (nano::message const &){}; }; } \ No newline at end of file diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 8aaecce0..924a9155 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -21,7 +21,6 @@ nano::network::network (nano::node & node_a, uint16_t port_a) : } }, buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer resolver (node_a.io_ctx), - limiter (node_a.config.bandwidth_limit_burst_ratio, node_a.config.bandwidth_limit), tcp_message_manager (node_a.config.tcp_incoming_connections_max), node (node_a), publish_filter (256 * 1024), @@ -410,13 +409,14 @@ public: channel (channel_a) { } + void keepalive (nano::keepalive const & message_a) override { if (node.config.logging.network_keepalive_logging ()) { node.logger.try_log (boost::str (boost::format ("Received keepalive message from %1%") % channel->to_string ())); } - node.stats.inc (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in); + node.network.merge_peers (message_a.peers); // Check for special node port data @@ -430,13 +430,14 @@ public: channel->set_peering_endpoint (new_endpoint); } } + void publish (nano::publish const & message_a) override { if (node.config.logging.network_message_logging ()) { node.logger.try_log (boost::str (boost::format ("Publish message from %1% for %2%") % channel->to_string () % message_a.block->hash ().to_string ())); } - node.stats.inc (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in); + if (!node.block_processor.full ()) { node.process_active (message_a.block); @@ -447,6 +448,7 @@ public: node.stats.inc (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::in); } } + void confirm_req (nano::confirm_req const & message_a) override { if (node.config.logging.network_message_logging ()) @@ -460,7 +462,7 @@ public: node.logger.try_log (boost::str (boost::format ("Confirm_req message from %1% for %2%") % channel->to_string () % message_a.block->hash ().to_string ())); } } - node.stats.inc (nano::stat::type::message, nano::stat::detail::confirm_req, nano::stat::dir::in); + // Don't load nodes with disabled voting if (node.config.enable_voting && node.wallets.reps ().voting > 0) { @@ -474,45 +476,51 @@ public: } } } + void confirm_ack (nano::confirm_ack const & message_a) override { if (node.config.logging.network_message_logging ()) { node.logger.try_log (boost::str (boost::format ("Received confirm_ack message from %1% for %2% timestamp %3%") % channel->to_string () % message_a.vote->hashes_string () % std::to_string (message_a.vote->timestamp ()))); } - node.stats.inc (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::in); + if (!message_a.vote->account.is_zero ()) { node.vote_processor.vote (message_a.vote, channel); } } + void bulk_pull (nano::bulk_pull const &) override { debug_assert (false); } + void bulk_pull_account (nano::bulk_pull_account const &) override { debug_assert (false); } + void bulk_push (nano::bulk_push const &) override { debug_assert (false); } + void frontier_req (nano::frontier_req const &) override { debug_assert (false); } + void node_id_handshake (nano::node_id_handshake const & message_a) override { node.stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in); } + void telemetry_req (nano::telemetry_req const & message_a) override { if (node.config.logging.network_telemetry_logging ()) { node.logger.try_log (boost::str (boost::format ("Telemetry_req message from %1%") % channel->to_string ())); } - node.stats.inc (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in); // Send an empty telemetry_ack if we do not want, just to acknowledge that we have received the message to // remove any timeouts on the server side waiting for a message. @@ -524,27 +532,42 @@ public: } channel->send (telemetry_ack, nullptr, nano::buffer_drop_policy::no_socket_drop); } + void telemetry_ack (nano::telemetry_ack const & message_a) override { if (node.config.logging.network_telemetry_logging ()) { node.logger.try_log (boost::str (boost::format ("Received telemetry_ack message from %1%") % channel->to_string ())); } - node.stats.inc (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in); + if (node.telemetry) { node.telemetry->set (message_a, *channel); } } + + void asc_pull_req (nano::asc_pull_req const & message) override + { + node.bootstrap_server.request (message, channel); + } + + void asc_pull_ack (nano::asc_pull_ack const & message) override + { + // TODO: Process in ascending bootstrap client + } + +private: nano::node & node; std::shared_ptr channel; }; } -void nano::network::process_message (nano::message const & message_a, std::shared_ptr const & channel_a) +void nano::network::process_message (nano::message const & message, std::shared_ptr const & channel) { - network_message_visitor visitor (node, channel_a); - message_a.visit (visitor); + node.stats.inc (nano::stat::type::message, nano::to_stat_detail (message.header.type), nano::stat::dir::in); + + network_message_visitor visitor (node, channel); + message.visit (visitor); } // Send keepalives to all the peers we've been notified of @@ -727,7 +750,7 @@ std::shared_ptr nano::network::find_node_id (nano::acc return result; } -nano::endpoint nano::network::endpoint () +nano::endpoint nano::network::endpoint () const { return nano::endpoint (boost::asio::ip::address_v6::loopback (), port); } @@ -808,11 +831,6 @@ void nano::network::erase (nano::transport::channel const & channel_a) } } -void nano::network::set_bandwidth_params (double limit_burst_ratio_a, std::size_t limit_a) -{ - limiter.reset (limit_burst_ratio_a, limit_a); -} - nano::message_buffer_manager::message_buffer_manager (nano::stat & stats_a, std::size_t size, std::size_t count) : stats (stats_a), free (count), diff --git a/nano/node/network.hpp b/nano/node/network.hpp index c87c0e15..3df60735 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -11,6 +11,7 @@ #include #include #include + namespace nano { class channel; @@ -115,11 +116,13 @@ private: std::unordered_map cookies_per_ip; std::size_t max_cookies_per_ip; }; + class network final { public: network (nano::node &, uint16_t); ~network (); + nano::networks id; void start (); void stop (); @@ -148,7 +151,7 @@ public: bool not_a_peer (nano::endpoint const &, bool); // Should we reach out to this endpoint with a keepalive message bool reachout (nano::endpoint const &, bool = false); - std::deque> list (std::size_t, uint8_t = 0, bool = true); + std::deque> list (std::size_t max_count = 0, uint8_t = 0, bool = true); std::deque> list_non_pr (std::size_t); // Desired fanout for a given scale std::size_t fanout (float scale = 1.0f) const; @@ -158,7 +161,7 @@ public: std::unordered_set> random_set (std::size_t, uint8_t = 0, bool = false) const; // Get the next peer for attempting a tcp bootstrap connection nano::tcp_endpoint bootstrap_peer (bool = false); - nano::endpoint endpoint (); + nano::endpoint endpoint () const; void cleanup (std::chrono::steady_clock::time_point const &); void ongoing_cleanup (); // Node ID cookies cleanup @@ -169,7 +172,6 @@ public: float size_sqrt () const; bool empty () const; void erase (nano::transport::channel const &); - void set_bandwidth_params (double, std::size_t); static std::string to_string (nano::networks); private: @@ -180,7 +182,6 @@ public: nano::message_buffer_manager buffer_container; boost::asio::ip::udp::resolver resolver; std::vector packet_processing_threads; - nano::bandwidth_limiter limiter; nano::peer_exclusion excluded_peers; nano::tcp_message_manager tcp_message_manager; nano::node & node; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 36a42825..8bc559e4 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -31,6 +31,10 @@ extern unsigned char nano_bootstrap_weights_beta[]; extern std::size_t nano_bootstrap_weights_beta_size; } +/* + * Configs + */ + nano::backlog_population::config nano::nodeconfig_to_backlog_population_config (const nano::node_config & config) { nano::backlog_population::config cfg; @@ -53,6 +57,20 @@ nano::hinted_scheduler::config nano::nodeconfig_to_hinted_scheduler_config (cons return cfg; } +nano::outbound_bandwidth_limiter::config nano::outbound_bandwidth_limiter_config (const nano::node_config & config) +{ + outbound_bandwidth_limiter::config cfg; + cfg.standard_limit = config.bandwidth_limit; + cfg.standard_burst_ratio = config.bandwidth_limit_burst_ratio; + cfg.bootstrap_limit = config.bootstrap_bandwidth_limit; + cfg.bootstrap_burst_ratio = config.bootstrap_bandwidth_burst_ratio; + return cfg; +} + +/* + * node + */ + void nano::node::keepalive (std::string const & address_a, uint16_t port_a) { auto node_l (shared_from_this ()); @@ -148,12 +166,14 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co gap_cache (*this), ledger (store, stats, network_params.ledger, flags_a.generate_cache), checker (config.signature_checker_threads), + outbound_limiter{ outbound_bandwidth_limiter_config (config) }, // empty `config.peering_port` means the user made no port choice at all; // otherwise, any value is considered, with `0` having the special meaning of 'let the OS pick a port instead' // network (*this, config.peering_port.has_value () ? *config.peering_port : 0), telemetry (std::make_shared (network, workers, observers.telemetry, stats, network_params, flags.disable_ongoing_telemetry_requests)), bootstrap_initiator (*this), + bootstrap_server{ store, ledger, network_params.network, stats }, // BEWARE: `bootstrap` takes `network.port` instead of `config.peering_port` because when the user doesn't specify // a peering port and wants the OS to pick one, the picking happens when `network` gets initialized // (if UDP is active, otherwise it happens when `bootstrap` gets initialized), so then for TCP traffic @@ -749,6 +769,7 @@ void nano::node::start () final_generator.start (); backlog.start (); hinting.start (); + bootstrap_server.start (); } void nano::node::stop () @@ -775,6 +796,7 @@ void nano::node::stop () { websocket_server->stop (); } + bootstrap_server.stop (); bootstrap_initiator.stop (); tcp_listener.stop (); port_mapping.stop (); @@ -1500,7 +1522,7 @@ void nano::node::set_bandwidth_params (std::size_t limit, double ratio) { config.bandwidth_limit_burst_ratio = ratio; config.bandwidth_limit = limit; - network.set_bandwidth_params (limit, ratio); + outbound_limiter.reset (limit, ratio); logger.always_log (boost::str (boost::format ("set_bandwidth_params(%1%, %2%)") % limit % ratio)); } diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 77f8d7b7..018629ed 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -5,10 +5,12 @@ #include #include #include +#include #include #include #include #include +#include #include #include #include @@ -59,6 +61,7 @@ std::unique_ptr collect_container_info (rep_crawler & backlog_population::config nodeconfig_to_backlog_population_config (node_config const &); vote_cache::config nodeconfig_to_vote_cache_config (node_config const &, node_flags const &); hinted_scheduler::config nodeconfig_to_hinted_scheduler_config (node_config const &); +outbound_bandwidth_limiter::config outbound_bandwidth_limiter_config (node_config const &); class node final : public std::enable_shared_from_this { @@ -154,9 +157,11 @@ public: nano::gap_cache gap_cache; nano::ledger ledger; nano::signature_checker checker; + nano::outbound_bandwidth_limiter outbound_limiter; nano::network network; std::shared_ptr telemetry; nano::bootstrap_initiator bootstrap_initiator; + nano::bootstrap_server bootstrap_server; nano::transport::tcp_listener tcp_listener; boost::filesystem::path application_path; nano::node_observers observers; diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index 167c4d32..eb2d8829 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -115,8 +115,13 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const toml.put ("use_memory_pools", use_memory_pools, "If true, allocate memory from memory pools. Enabling this may improve performance. Memory is never released to the OS.\ntype:bool"); toml.put ("confirmation_history_size", confirmation_history_size, "Maximum confirmation history size. If tracking the rate of block confirmations, the websocket feature is recommended instead.\ntype:uint64"); toml.put ("active_elections_size", active_elections_size, "Number of active elections. Elections beyond this limit have limited survival time.\nWarning: modifying this value may result in a lower confirmation rate.\ntype:uint64,[250..]"); + toml.put ("bandwidth_limit", bandwidth_limit, "Outbound traffic limit in bytes/sec after which messages will be dropped.\nNote: changing to unlimited bandwidth (0) is not recommended for limited connections.\ntype:uint64"); toml.put ("bandwidth_limit_burst_ratio", bandwidth_limit_burst_ratio, "Burst ratio for outbound traffic shaping.\ntype:double"); + + toml.put ("bootstrap_bandwidth_limit", bootstrap_bandwidth_limit, "Outbound bootstrap traffic limit in bytes/sec after which messages will be dropped.\nNote: changing to unlimited bandwidth (0) is not recommended for limited connections.\ntype:uint64"); + toml.put ("bootstrap_bandwidth_burst_ratio", bootstrap_bandwidth_burst_ratio, "Burst ratio for outbound bootstrap traffic.\ntype:double"); + toml.put ("conf_height_processor_batch_min_time", conf_height_processor_batch_min_time.count (), "Minimum write batching time when there are blocks pending confirmation height.\ntype:milliseconds"); toml.put ("backup_before_upgrade", backup_before_upgrade, "Backup the ledger database before performing upgrades.\nWarning: uses more disk storage and increases startup time when upgrading.\ntype:bool"); toml.put ("max_work_generate_multiplier", max_work_generate_multiplier, "Maximum allowed difficulty multiplier for work generation.\ntype:double,[1..]"); @@ -360,8 +365,13 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml) toml.get ("use_memory_pools", use_memory_pools); toml.get ("confirmation_history_size", confirmation_history_size); toml.get ("active_elections_size", active_elections_size); + toml.get ("bandwidth_limit", bandwidth_limit); toml.get ("bandwidth_limit_burst_ratio", bandwidth_limit_burst_ratio); + + toml.get ("bootstrap_bandwidth_limit", bootstrap_bandwidth_limit); + toml.get ("bootstrap_bandwidth_burst_ratio", bootstrap_bandwidth_burst_ratio); + toml.get ("backup_before_upgrade", backup_before_upgrade); auto conf_height_processor_batch_min_time_l (conf_height_processor_batch_min_time.count ()); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 10f04e9f..a1f1fa41 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -95,6 +95,10 @@ public: std::size_t bandwidth_limit{ 10 * 1024 * 1024 }; /** By default, allow bursts of 15MB/s (not sustainable) */ double bandwidth_limit_burst_ratio{ 3. }; + /** Default boostrap outbound traffic limit is 16MB/s ~ 128Mbit/s */ + std::size_t bootstrap_bandwidth_limit{ 16 * 1024 * 1024 }; + /** Bootstrap traffic does not need bursts */ + double bootstrap_bandwidth_burst_ratio{ 1. }; std::chrono::milliseconds conf_height_processor_batch_min_time{ 50 }; bool backup_before_upgrade{ false }; double max_work_generate_multiplier{ 64. }; diff --git a/nano/node/transport/message_deserializer.cpp b/nano/node/transport/message_deserializer.cpp index d97466d6..2874efe0 100644 --- a/nano/node/transport/message_deserializer.cpp +++ b/nano/node/transport/message_deserializer.cpp @@ -178,6 +178,14 @@ std::unique_ptr nano::transport::message_deserializer::deserializ { return deserialize_frontier_req (stream, header); } + case nano::message_type::asc_pull_req: + { + return deserialize_asc_pull_req (stream, header); + } + case nano::message_type::asc_pull_ack: + { + return deserialize_asc_pull_ack (stream, header); + } default: { status = parse_status::invalid_message_type; @@ -191,7 +199,7 @@ std::unique_ptr nano::transport::message_deserializer::deserial { auto error = false; auto incoming = std::make_unique (error, stream, header); - if (!error && at_end (stream)) + if (!error && nano::at_end (stream)) { return incoming; } @@ -206,7 +214,7 @@ std::unique_ptr nano::transport::message_deserializer::deserializ { auto error = false; auto incoming = std::make_unique (error, stream, header, digest_a, &block_uniquer_m); - if (!error && at_end (stream)) + if (!error && nano::at_end (stream)) { release_assert (incoming->block); if (!network_constants_m.work.validate_entry (*incoming->block)) @@ -229,7 +237,7 @@ std::unique_ptr nano::transport::message_deserializer::deseri { auto error = false; auto incoming = std::make_unique (error, stream, header, &block_uniquer_m); - if (!error && at_end (stream)) + if (!error && nano::at_end (stream)) { if (incoming->block == nullptr || !network_constants_m.work.validate_entry (*incoming->block)) { @@ -251,7 +259,7 @@ std::unique_ptr nano::transport::message_deserializer::deseri { auto error = false; auto incoming = std::make_unique (error, stream, header, &vote_uniquer_m); - if (!error && at_end (stream)) + if (!error && nano::at_end (stream)) { return incoming; } @@ -266,7 +274,7 @@ std::unique_ptr nano::transport::message_deserializer:: { bool error = false; auto incoming = std::make_unique (error, stream, header); - if (!error && at_end (stream)) + if (!error && nano::at_end (stream)) { return incoming; } @@ -303,7 +311,7 @@ std::unique_ptr nano::transport::message_deserializer::deserial { bool error = false; auto incoming = std::make_unique (error, stream, header); - if (!error && at_end (stream)) + if (!error && nano::at_end (stream)) { return incoming; } @@ -318,7 +326,7 @@ std::unique_ptr nano::transport::message_deserializer:: { bool error = false; auto incoming = std::make_unique (error, stream, header); - if (!error && at_end (stream)) + if (!error && nano::at_end (stream)) { return incoming; } @@ -333,7 +341,7 @@ std::unique_ptr nano::transport::message_deserializer::deser { bool error = false; auto incoming = std::make_unique (error, stream, header); - if (!error && at_end (stream)) + if (!error && nano::at_end (stream)) { return incoming; } @@ -350,15 +358,41 @@ std::unique_ptr nano::transport::message_deserializer::deserial return std::make_unique (header); } -bool nano::transport::message_deserializer::at_end (nano::stream & stream) +std::unique_ptr nano::transport::message_deserializer::deserialize_asc_pull_req (nano::stream & stream, const nano::message_header & header) { - uint8_t junk; - auto end (nano::try_read (stream, junk)); - return end; + bool error = false; + auto incoming = std::make_unique (error, stream, header); + // Intentionally not checking if at the end of stream, because these messages support backwards/forwards compatibility + if (!error) + { + return incoming; + } + else + { + status = parse_status::invalid_asc_pull_req_message; + } + return {}; } -nano::stat::detail nano::transport::message_deserializer::parse_status_to_stat_detail () +std::unique_ptr nano::transport::message_deserializer::deserialize_asc_pull_ack (nano::stream & stream, const nano::message_header & header) { + bool error = false; + auto incoming = std::make_unique (error, stream, header); + // Intentionally not checking if at the end of stream, because these messages support backwards/forwards compatibility + if (!error) + { + return incoming; + } + else + { + status = parse_status::invalid_asc_pull_ack_message; + } + return {}; +} + +nano::stat::detail nano::transport::message_deserializer::to_stat_detail (parse_status status) +{ + // Keep additional `break` for readability switch (status) { case parse_status::none: @@ -366,84 +400,133 @@ nano::stat::detail nano::transport::message_deserializer::parse_status_to_stat_d break; case parse_status::insufficient_work: return stat::detail::insufficient_work; + break; case parse_status::invalid_header: return stat::detail::invalid_header; + break; case parse_status::invalid_message_type: return stat::detail::invalid_message_type; + break; case parse_status::invalid_keepalive_message: return stat::detail::invalid_keepalive_message; + break; case parse_status::invalid_publish_message: return stat::detail::invalid_publish_message; + break; case parse_status::invalid_confirm_req_message: return stat::detail::invalid_confirm_req_message; + break; case parse_status::invalid_confirm_ack_message: return stat::detail::invalid_confirm_ack_message; + break; case parse_status::invalid_node_id_handshake_message: return stat::detail::invalid_node_id_handshake_message; + break; case parse_status::invalid_telemetry_req_message: return stat::detail::invalid_telemetry_req_message; + break; case parse_status::invalid_telemetry_ack_message: return stat::detail::invalid_telemetry_ack_message; + break; case parse_status::invalid_bulk_pull_message: return stat::detail::invalid_bulk_pull_message; + break; case parse_status::invalid_bulk_pull_account_message: return stat::detail::invalid_bulk_pull_account_message; + break; case parse_status::invalid_frontier_req_message: return stat::detail::invalid_frontier_req_message; + break; + case parse_status::invalid_asc_pull_req_message: + return stat::detail::invalid_asc_pull_req_message; + break; + case parse_status::invalid_asc_pull_ack_message: + return stat::detail::invalid_asc_pull_ack_message; + break; case parse_status::invalid_network: return stat::detail::invalid_network; + break; case parse_status::outdated_version: return stat::detail::outdated_version; + break; case parse_status::duplicate_publish_message: return stat::detail::duplicate_publish; + break; case parse_status::message_size_too_big: return stat::detail::message_too_big; + break; } return {}; } -std::string nano::transport::message_deserializer::parse_status_to_string () +std::string nano::transport::message_deserializer::to_string (parse_status status) { + // Keep additional `break` for readability switch (status) { case parse_status::none: return "none"; + break; case parse_status::success: return "success"; + break; case parse_status::insufficient_work: return "insufficient_work"; + break; case parse_status::invalid_header: return "invalid_header"; + break; case parse_status::invalid_message_type: return "invalid_message_type"; + break; case parse_status::invalid_keepalive_message: return "invalid_keepalive_message"; + break; case parse_status::invalid_publish_message: return "invalid_publish_message"; + break; case parse_status::invalid_confirm_req_message: return "invalid_confirm_req_message"; + break; case parse_status::invalid_confirm_ack_message: return "invalid_confirm_ack_message"; + break; case parse_status::invalid_node_id_handshake_message: return "invalid_node_id_handshake_message"; + break; case parse_status::invalid_telemetry_req_message: return "invalid_telemetry_req_message"; + break; case parse_status::invalid_telemetry_ack_message: return "invalid_telemetry_ack_message"; + break; case parse_status::invalid_bulk_pull_message: return "invalid_bulk_pull_message"; + break; case parse_status::invalid_bulk_pull_account_message: return "invalid_bulk_pull_account_message"; + break; case parse_status::invalid_frontier_req_message: return "invalid_frontier_req_message"; + break; + case parse_status::invalid_asc_pull_req_message: + return "invalid_asc_pull_req_message"; + break; + case parse_status::invalid_asc_pull_ack_message: + return "invalid_asc_pull_ack_message"; + break; case parse_status::invalid_network: return "invalid_network"; + break; case parse_status::outdated_version: return "outdated_version"; + break; case parse_status::duplicate_publish_message: return "duplicate_publish_message"; + break; case parse_status::message_size_too_big: return "message_size_too_big"; + break; } return "n/a"; } diff --git a/nano/node/transport/message_deserializer.hpp b/nano/node/transport/message_deserializer.hpp index 67f15314..bfacf273 100644 --- a/nano/node/transport/message_deserializer.hpp +++ b/nano/node/transport/message_deserializer.hpp @@ -35,6 +35,8 @@ namespace transport invalid_bulk_pull_message, invalid_bulk_pull_account_message, invalid_frontier_req_message, + invalid_asc_pull_req_message, + invalid_asc_pull_ack_message, invalid_network, outdated_version, duplicate_publish_message, @@ -76,24 +78,25 @@ namespace transport std::unique_ptr deserialize_bulk_pull_account (nano::stream &, nano::message_header const &); std::unique_ptr deserialize_bulk_push (nano::stream &, nano::message_header const &); std::unique_ptr deserialize_frontier_req (nano::stream &, nano::message_header const &); - - static bool at_end (nano::stream &); + std::unique_ptr deserialize_asc_pull_req (nano::stream &, nano::message_header const &); + std::unique_ptr deserialize_asc_pull_ack (nano::stream &, nano::message_header const &); std::shared_ptr> read_buffer; - public: - std::string parse_status_to_string (); - stat::detail parse_status_to_stat_detail (); - private: // Constants static constexpr std::size_t HEADER_SIZE = 8; - static constexpr std::size_t MAX_MESSAGE_SIZE = 1024 * 4; + static constexpr std::size_t MAX_MESSAGE_SIZE = 1024 * 65; private: // Dependencies nano::network_constants const & network_constants_m; nano::network_filter & publish_filter_m; nano::block_uniquer & block_uniquer_m; nano::vote_uniquer & vote_uniquer_m; + + public: + static stat::detail to_stat_detail (parse_status); + static std::string to_string (parse_status); }; + } } diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index 304e08b7..3796fb0b 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -204,11 +204,13 @@ void nano::transport::tcp_server::receive_message () if (ec) { // IO error or critical error when deserializing message - this_l->node->stats.inc (nano::stat::type::error, this_l->message_deserializer->parse_status_to_stat_detail ()); + this_l->node->stats.inc (nano::stat::type::error, nano::transport::message_deserializer::to_stat_detail (this_l->message_deserializer->status)); this_l->stop (); - return; } - this_l->received_message (std::move (message)); + else + { + this_l->received_message (std::move (message)); + } }); } @@ -223,7 +225,7 @@ void nano::transport::tcp_server::received_message (std::unique_ptrstatus != transport::message_deserializer::parse_status::success); - node->stats.inc (nano::stat::type::error, message_deserializer->parse_status_to_stat_detail ()); + node->stats.inc (nano::stat::type::error, nano::transport::message_deserializer::to_stat_detail (message_deserializer->status)); if (message_deserializer->status == transport::message_deserializer::parse_status::duplicate_publish_message) { node->stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_publish); @@ -264,18 +266,8 @@ bool nano::transport::tcp_server::process_message (std::unique_ptrtype () == nano::socket::type_t::undefined && !node->flags.disable_bootstrap_listener && node->tcp_listener.bootstrap_count < node->config.bootstrap_connections_max) + if (!allow_bootstrap) { - ++node->tcp_listener.bootstrap_count; - socket->type_set (nano::socket::type_t::bootstrap); - return true; + return false; } - return false; + if (node->flags.disable_bootstrap_listener) + { + return false; + } + if (node->tcp_listener.bootstrap_count >= node->config.bootstrap_connections_max) + { + return false; + } + if (socket->type () != nano::socket::type_t::undefined) + { + return false; + } + + ++node->tcp_listener.bootstrap_count; + socket->type_set (nano::socket::type_t::bootstrap); + return true; } bool nano::transport::tcp_server::to_realtime_connection (nano::account const & node_id) diff --git a/nano/node/transport/tcp_server.hpp b/nano/node/transport/tcp_server.hpp index 4c6a8ff1..6ba2d239 100644 --- a/nano/node/transport/tcp_server.hpp +++ b/nano/node/transport/tcp_server.hpp @@ -115,6 +115,8 @@ private: void frontier_req (nano::frontier_req const &) override; void telemetry_req (nano::telemetry_req const &) override; void telemetry_ack (nano::telemetry_ack const &) override; + void asc_pull_req (nano::asc_pull_req const &) override; + void asc_pull_ack (nano::asc_pull_ack const &) override; private: tcp_server & server; diff --git a/nano/node/transport/transport.cpp b/nano/node/transport/transport.cpp index 8337934c..0a67f1db 100644 --- a/nano/node/transport/transport.cpp +++ b/nano/node/transport/transport.cpp @@ -51,13 +51,13 @@ nano::transport::channel::channel (nano::node & node_a) : set_network_version (node_a.network_params.network.protocol_version); } -void nano::transport::channel::send (nano::message & message_a, std::function const & callback_a, nano::buffer_drop_policy drop_policy_a) +void nano::transport::channel::send (nano::message & message_a, std::function const & callback_a, nano::buffer_drop_policy drop_policy_a, nano::bandwidth_limit_type limiter_type) { auto buffer (message_a.to_shared_const_buffer ()); auto detail = nano::to_stat_detail (message_a.header.type); auto is_droppable_by_limiter = drop_policy_a == nano::buffer_drop_policy::limiter; - auto should_drop (node.network.limiter.should_drop (buffer.size ())); - if (!is_droppable_by_limiter || !should_drop) + auto should_pass (node.outbound_limiter.should_pass (buffer.size (), limiter_type)); + if (!is_droppable_by_limiter || should_pass) { send_buffer (buffer, callback_a, drop_policy_a); node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out); @@ -209,20 +209,3 @@ bool nano::transport::reserved_address (nano::endpoint const & endpoint_a, bool } return result; } - -using namespace std::chrono_literals; - -nano::bandwidth_limiter::bandwidth_limiter (double const limit_burst_ratio_a, std::size_t const limit_a) : - bucket (static_cast (limit_a * limit_burst_ratio_a), limit_a) -{ -} - -bool nano::bandwidth_limiter::should_drop (std::size_t const & message_size_a) -{ - return !bucket.try_consume (nano::narrow_cast (message_size_a)); -} - -void nano::bandwidth_limiter::reset (double const limit_burst_ratio_a, std::size_t const limit_a) -{ - bucket.reset (static_cast (limit_a * limit_burst_ratio_a), limit_a); -} diff --git a/nano/node/transport/transport.hpp b/nano/node/transport/transport.hpp index a1ee6e49..11ec8849 100644 --- a/nano/node/transport/transport.hpp +++ b/nano/node/transport/transport.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -11,18 +12,6 @@ namespace nano { -class bandwidth_limiter final -{ -public: - // initialize with limit 0 = unbounded - bandwidth_limiter (double, std::size_t); - bool should_drop (std::size_t const &); - void reset (double, std::size_t); - -private: - nano::rate::token_bucket bucket; -}; - namespace transport { nano::endpoint map_endpoint_to_v6 (nano::endpoint const &); @@ -52,7 +41,7 @@ namespace transport virtual ~channel () = default; virtual std::size_t hash_code () const = 0; virtual bool operator== (nano::transport::channel const &) const = 0; - void send (nano::message & message_a, std::function const & callback_a = nullptr, nano::buffer_drop_policy policy_a = nano::buffer_drop_policy::limiter); + void send (nano::message & message_a, std::function const & callback_a = nullptr, nano::buffer_drop_policy policy_a = nano::buffer_drop_policy::limiter, nano::bandwidth_limit_type = nano::bandwidth_limit_type::standard); // TODO: investigate clang-tidy warning about default parameters on virtual/override functions // virtual void send_buffer (nano::shared_const_buffer const &, std::function const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter) = 0; diff --git a/nano/secure/common.cpp b/nano/secure/common.cpp index 2dee0b4c..7f5638c2 100644 --- a/nano/secure/common.cpp +++ b/nano/secure/common.cpp @@ -222,13 +222,6 @@ nano::keypair::keypair (std::string const & prv_a) ed25519_publickey (prv.bytes.data (), pub.bytes.data ()); } -// Serialize a block prefixed with an 8-bit typecode -void nano::serialize_block (nano::stream & stream_a, nano::block const & block_a) -{ - write (stream_a, block_a.type ()); - block_a.serialize (stream_a); -} - nano::account_info::account_info (nano::block_hash const & head_a, nano::account const & representative_a, nano::block_hash const & open_block_a, nano::amount const & balance_a, uint64_t modified_a, uint64_t block_count_a, nano::epoch epoch_a) : head (head_a), representative (representative_a), diff --git a/nano/secure/ledger.cpp b/nano/secure/ledger.cpp index fdaff753..7b165efe 100644 --- a/nano/secure/ledger.cpp +++ b/nano/secure/ledger.cpp @@ -595,7 +595,7 @@ void ledger_processor::receive_block (nano::receive_block & block_a) if (result.code == nano::process_result::progress) { auto account (ledger.store.frontier.get (transaction, block_a.hashables.previous)); - result.code = account.is_zero () ? nano::process_result::gap_previous : nano::process_result::progress; //Have we seen the previous block? No entries for account at all (Harmless) + result.code = account.is_zero () ? nano::process_result::gap_previous : nano::process_result::progress; // Have we seen the previous block? No entries for account at all (Harmless) if (result.code == nano::process_result::progress) { // Validate block if not verified outside of ledger @@ -1081,7 +1081,6 @@ bool nano::ledger::rollback (nano::write_transaction const & transaction_a, nano return rollback (transaction_a, block_a, rollback_list); } -// Return account containing hash nano::account nano::ledger::account (nano::transaction const & transaction_a, nano::block_hash const & hash_a) const { return store.block.account (transaction_a, hash_a); @@ -1108,6 +1107,19 @@ nano::account nano::ledger::account_safe (nano::transaction const & transaction_ } } +nano::account nano::ledger::account_safe (const nano::transaction & transaction, const nano::block_hash & hash) const +{ + auto block = store.block.get (transaction, hash); + if (block) + { + return store.block.account_calculated (*block); + } + else + { + return { 0 }; + } +} + // Return amount decrease or increase for block nano::uint128_t nano::ledger::amount (nano::transaction const & transaction_a, nano::account const & account_a) { diff --git a/nano/secure/ledger.hpp b/nano/secure/ledger.hpp index 3eb14b46..96561beb 100644 --- a/nano/secure/ledger.hpp +++ b/nano/secure/ledger.hpp @@ -28,8 +28,19 @@ class ledger final { public: ledger (nano::store &, nano::stat &, nano::ledger_constants & constants, nano::generate_cache const & = nano::generate_cache ()); + /** + * Return account containing hash, expects that block hash exists in ledger + */ nano::account account (nano::transaction const &, nano::block_hash const &) const; + /** + * For non-prunning nodes same as `ledger::account()` + * For prunning nodes ensures that block hash exists, otherwise returns zero account + */ nano::account account_safe (nano::transaction const &, nano::block_hash const &, bool &) const; + /** + * Return account containing hash, returns zero account if account can not be found + */ + nano::account account_safe (nano::transaction const &, nano::block_hash const &) const; nano::uint128_t amount (nano::transaction const &, nano::account const &); nano::uint128_t amount (nano::transaction const &, nano::block_hash const &); /** Safe for previous block, but block hash_a must exist */ diff --git a/nano/secure/store.cpp b/nano/secure/store.cpp index f4757cb0..e6fd573b 100644 --- a/nano/secure/store.cpp +++ b/nano/secure/store.cpp @@ -167,4 +167,32 @@ auto nano::unchecked_store::equal_range (nano::transaction const & transaction, auto nano::unchecked_store::full_range (nano::transaction const & transaction) -> std::pair { return std::make_pair (begin (transaction), end ()); +} + +std::optional nano::account_store::get (const nano::transaction & transaction, const nano::account & account) +{ + nano::account_info info; + bool error = get (transaction, account, info); + if (!error) + { + return info; + } + else + { + return std::nullopt; + } +} + +std::optional nano::confirmation_height_store::get (const nano::transaction & transaction, const nano::account & account) +{ + nano::confirmation_height_info info; + bool error = get (transaction, account, info); + if (!error) + { + return info; + } + else + { + return std::nullopt; + } } \ No newline at end of file diff --git a/nano/secure/store.hpp b/nano/secure/store.hpp index e0b80987..a360c550 100644 --- a/nano/secure/store.hpp +++ b/nano/secure/store.hpp @@ -643,6 +643,7 @@ class account_store public: virtual void put (nano::write_transaction const &, nano::account const &, nano::account_info const &) = 0; virtual bool get (nano::transaction const &, nano::account const &, nano::account_info &) = 0; + std::optional get (nano::transaction const &, nano::account const &); virtual void del (nano::write_transaction const &, nano::account const &) = 0; virtual bool exists (nano::transaction const &, nano::account const &) = 0; virtual size_t count (nano::transaction const &) = 0; @@ -732,7 +733,7 @@ public: * Ruturns true on error, false on success. */ virtual bool get (nano::transaction const & transaction_a, nano::account const & account_a, nano::confirmation_height_info & confirmation_height_info_a) = 0; - + std::optional get (nano::transaction const & transaction_a, nano::account const & account_a); virtual bool exists (nano::transaction const & transaction_a, nano::account const & account_a) const = 0; virtual void del (nano::write_transaction const & transaction_a, nano::account const & account_a) = 0; virtual uint64_t count (nano::transaction const & transaction_a) = 0; diff --git a/nano/test_common/testutil.cpp b/nano/test_common/testutil.cpp index bb1584a5..24cbd710 100644 --- a/nano/test_common/testutil.cpp +++ b/nano/test_common/testutil.cpp @@ -38,6 +38,23 @@ void nano::test::wait_peer_connections (nano::test::system & system_a) wait_peer_count (false); } +nano::hash_or_account nano::test::random_hash_or_account () +{ + nano::hash_or_account random_hash; + nano::random_pool::generate_block (random_hash.bytes.data (), random_hash.bytes.size ()); + return random_hash; +} + +nano::block_hash nano::test::random_hash () +{ + return nano::test::random_hash_or_account ().as_block_hash (); +} + +nano::account nano::test::random_account () +{ + return nano::test::random_hash_or_account ().as_account (); +} + bool nano::test::process (nano::node & node, std::vector> blocks) { auto const transaction = node.store.tx_begin_write ({ tables::accounts, tables::blocks, tables::frontiers, tables::pending }); diff --git a/nano/test_common/testutil.hpp b/nano/test_common/testutil.hpp index 362c5cae..ae403c2e 100644 --- a/nano/test_common/testutil.hpp +++ b/nano/test_common/testutil.hpp @@ -272,6 +272,19 @@ namespace test void wait_peer_connections (nano::test::system &); + /** + * Generate a random block hash + */ + nano::hash_or_account random_hash_or_account (); + /** + * Generate a random block hash + */ + nano::block_hash random_hash (); + /** + * Generate a random block hash + */ + nano::account random_account (); + /** Convenience function to call `node::process` function for multiple blocks at once. @return true if all blocks were successfully processed and inserted into ledger