From 196198e5db5943c3db638c0fcfb3e598f5e3a379 Mon Sep 17 00:00:00 2001 From: Colin LeMahieu Date: Sun, 5 Mar 2023 14:19:11 +0000 Subject: [PATCH 1/5] Disables disk usage from the unchecked_map class. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With the ascending bootstrap process the number of blocks needing to be tracked can be limited and remain in memory. The maximum number of blocks is set to 64k though typically far fewer are actually needed. Co-authored-by: Piotr Wójcik <3044353+pwojcikdev@users.noreply.github.com> Co-authored-by: gr0vity-dev <85646666+gr0vity-dev@users.noreply.github.com> --- nano/core_test/block_store.cpp | 3 +- nano/core_test/bootstrap.cpp | 12 +- nano/core_test/confirmation_height.cpp | 8 +- nano/core_test/ledger.cpp | 37 +++--- nano/core_test/node.cpp | 10 +- nano/core_test/unchecked_map.cpp | 55 ++++---- nano/nano_node/entry.cpp | 6 +- nano/node/cli.cpp | 4 +- nano/node/json_handler.cpp | 11 +- nano/node/node.cpp | 15 +-- nano/node/unchecked_map.cpp | 177 ++++++------------------- nano/node/unchecked_map.hpp | 60 ++++----- nano/qt/qt.cpp | 2 +- nano/rpc_test/rpc.cpp | 8 +- nano/slow_test/node.cpp | 2 +- 15 files changed, 154 insertions(+), 256 deletions(-) diff --git a/nano/core_test/block_store.cpp b/nano/core_test/block_store.cpp index c37a8e4a..52396d51 100644 --- a/nano/core_test/block_store.cpp +++ b/nano/core_test/block_store.cpp @@ -450,9 +450,8 @@ TEST (block_store, empty_bootstrap) auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); nano::unchecked_map unchecked{ *store, system.stats, false }; ASSERT_TRUE (!store->init_error ()); - auto transaction (store->tx_begin_read ()); size_t count = 0; - unchecked.for_each (transaction, [&count] (nano::unchecked_key const & key, nano::unchecked_info const & info) { + unchecked.for_each ([&count] (nano::unchecked_key const & key, nano::unchecked_info const & info) { ++count; }); ASSERT_EQ (count, 0); diff --git a/nano/core_test/bootstrap.cpp b/nano/core_test/bootstrap.cpp index 899fa4ed..f2a13dc9 100644 --- a/nano/core_test/bootstrap.cpp +++ b/nano/core_test/bootstrap.cpp @@ -127,12 +127,12 @@ TEST (bulk_pull, ascending_one_hash) auto connection = std::make_shared (socket, system.nodes[0]); auto req = std::make_unique (nano::dev::network_params.network); req->start = nano::dev::genesis->hash (); - req->end = nano::dev::genesis->hash (); + req->end.clear (); req->header.flag_set (nano::message_header::bulk_pull_ascending_flag); auto request = std::make_shared (connection, std::move (req)); auto block_out1 = request->get_next (); ASSERT_NE (nullptr, block_out1); - ASSERT_EQ (block_out1->hash (), nano::dev::genesis->hash ()); + ASSERT_EQ (block_out1->hash (), block1->hash ()); ASSERT_EQ (nullptr, request->get_next ()); } @@ -158,7 +158,7 @@ TEST (bulk_pull, ascending_two_account) auto socket = std::make_shared (node, nano::transport::socket::endpoint_type_t::server); auto connection = std::make_shared (socket, system.nodes[0]); auto req = std::make_unique (nano::dev::network_params.network); - req->start = nano::dev::genesis->hash (); + req->start = nano::dev::genesis->account (); req->end.clear (); req->header.flag_set (nano::message_header::bulk_pull_ascending_flag); auto request = std::make_shared (connection, std::move (req)); @@ -172,7 +172,7 @@ TEST (bulk_pull, ascending_two_account) } /** - Tests that the `end' value is respected in the bulk_pull message + Tests that the `end' value is respected in the bulk_pull message when the ascending flag is used. */ TEST (bulk_pull, ascending_end) { @@ -1351,7 +1351,7 @@ TEST (bootstrap_processor, lazy_pruning_missing_block) ASSERT_FALSE (node2->ledger.block_or_pruned_exists (state_open->hash ())); { auto transaction (node2->store.tx_begin_read ()); - ASSERT_TRUE (node2->unchecked.exists (transaction, nano::unchecked_key (send2->root ().as_block_hash (), send2->hash ()))); + ASSERT_TRUE (node2->unchecked.exists (nano::unchecked_key (send2->root ().as_block_hash (), send2->hash ()))); } // Insert missing block node2->process_active (send1); @@ -2045,7 +2045,7 @@ TEST (bulk, DISABLED_genesis_pruning) ASSERT_EQ (1, node2->ledger.cache.block_count); { auto transaction (node2->store.tx_begin_write ()); - node2->unchecked.clear (transaction); + node2->unchecked.clear (); } // Insert pruned blocks node2->process_active (send1); diff --git a/nano/core_test/confirmation_height.cpp b/nano/core_test/confirmation_height.cpp index 124dbacc..ff2698d5 100644 --- a/nano/core_test/confirmation_height.cpp +++ b/nano/core_test/confirmation_height.cpp @@ -391,14 +391,14 @@ TEST (confirmation_height, gap_bootstrap) node1.process_active (receive2); // Waits for the unchecked_map to process the 4 blocks added to the block_processor, saving them in the unchecked table auto check_block_is_listed = [&] (nano::transaction const & transaction_a, nano::block_hash const & block_hash_a) { - return !node1.unchecked.get (transaction_a, block_hash_a).empty (); + return !node1.unchecked.get (block_hash_a).empty (); }; ASSERT_TIMELY (5s, check_block_is_listed (node1.store.tx_begin_read (), receive2->previous ())); // Confirmation heights should not be updated { auto transaction (node1.store.tx_begin_read ()); - auto unchecked_count (node1.unchecked.count (transaction)); + auto unchecked_count (node1.unchecked.count ()); ASSERT_EQ (unchecked_count, 2); nano::confirmation_height_info confirmation_height_info; @@ -410,7 +410,7 @@ TEST (confirmation_height, gap_bootstrap) // Now complete the chain where the block comes in on the bootstrap network. node1.block_processor.add (open1); - ASSERT_TIMELY (5s, node1.unchecked.count (node1.store.tx_begin_read ()) == 0); + ASSERT_TIMELY (5s, node1.unchecked.count () == 0); // Confirmation height should be unchanged and unchecked should now be 0 { auto transaction = node1.store.tx_begin_read (); @@ -544,7 +544,7 @@ TEST (confirmation_height, gap_live) // This should confirm the open block and the source of the receive blocks auto transaction = node->store.tx_begin_read (); - auto unchecked_count = node->unchecked.count (transaction); + auto unchecked_count = node->unchecked.count (); ASSERT_EQ (unchecked_count, 0); nano::confirmation_height_info confirmation_height_info{}; diff --git a/nano/core_test/ledger.cpp b/nano/core_test/ledger.cpp index 8efab6b9..666707d1 100644 --- a/nano/core_test/ledger.cpp +++ b/nano/core_test/ledger.cpp @@ -3999,10 +3999,10 @@ TEST (ledger, epoch_open_pending) ASSERT_EQ (nano::process_result::gap_epoch_open_pending, process_result.code); node1.block_processor.add (epoch_open); // Waits for the block to get saved in the database - ASSERT_TIMELY (10s, 1 == node1.unchecked.count (node1.store.tx_begin_read ())); + ASSERT_TIMELY (10s, 1 == node1.unchecked.count ()); ASSERT_FALSE (node1.ledger.block_or_pruned_exists (epoch_open->hash ())); // Open block should be inserted into unchecked - auto blocks = node1.unchecked.get (node1.store.tx_begin_read (), nano::hash_or_account (epoch_open->account ()).hash); + auto blocks = node1.unchecked.get (nano::hash_or_account (epoch_open->account ()).hash); ASSERT_EQ (blocks.size (), 1); ASSERT_EQ (blocks[0].block->full_hash (), epoch_open->full_hash ()); // New block to process epoch open @@ -4293,8 +4293,8 @@ TEST (ledger, unchecked_epoch) node1.block_processor.add (epoch1); { // Waits for the epoch1 block to pass through block_processor and unchecked.put queues - ASSERT_TIMELY (10s, 1 == node1.unchecked.count (node1.store.tx_begin_read ())); - auto blocks = node1.unchecked.get (node1.store.tx_begin_read (), epoch1->previous ()); + ASSERT_TIMELY (10s, 1 == node1.unchecked.count ()); + auto blocks = node1.unchecked.get (epoch1->previous ()); ASSERT_EQ (blocks.size (), 1); } node1.block_processor.add (send1); @@ -4302,7 +4302,7 @@ TEST (ledger, unchecked_epoch) ASSERT_TIMELY (5s, node1.store.block.exists (node1.store.tx_begin_read (), epoch1->hash ())); { // Waits for the last blocks to pass through block_processor and unchecked.put queues - ASSERT_TIMELY (10s, 0 == node1.unchecked.count (node1.store.tx_begin_read ())); + ASSERT_TIMELY (10s, 0 == node1.unchecked.count ()); auto info = node1.ledger.account_info (node1.store.tx_begin_read (), destination.pub); ASSERT_TRUE (info); ASSERT_EQ (info->epoch (), nano::epoch::epoch_1); @@ -4367,8 +4367,8 @@ TEST (ledger, unchecked_epoch_invalid) node1.block_processor.add (epoch2); { // Waits for the last blocks to pass through block_processor and unchecked.put queues - ASSERT_TIMELY (10s, 2 == node1.unchecked.count (node1.store.tx_begin_read ())); - auto blocks = node1.unchecked.get (node1.store.tx_begin_read (), epoch1->previous ()); + ASSERT_TIMELY (10s, 2 == node1.unchecked.count ()); + auto blocks = node1.unchecked.get (epoch1->previous ()); ASSERT_EQ (blocks.size (), 2); } node1.block_processor.add (send1); @@ -4378,9 +4378,9 @@ TEST (ledger, unchecked_epoch_invalid) { auto transaction = node1.store.tx_begin_read (); ASSERT_FALSE (node1.store.block.exists (transaction, epoch1->hash ())); - auto unchecked_count = node1.unchecked.count (transaction); + auto unchecked_count = node1.unchecked.count (); ASSERT_EQ (unchecked_count, 0); - ASSERT_EQ (unchecked_count, node1.unchecked.count (transaction)); + ASSERT_EQ (unchecked_count, node1.unchecked.count ()); auto info = node1.ledger.account_info (transaction, destination.pub); ASSERT_TRUE (info); ASSERT_NE (info->epoch (), nano::epoch::epoch_1); @@ -4434,15 +4434,15 @@ TEST (ledger, unchecked_open) node1.block_processor.add (open1); { // Waits for the last blocks to pass through block_processor and unchecked.put queues - ASSERT_TIMELY (5s, 1 == node1.unchecked.count (node1.store.tx_begin_read ())); - // When open1 existists in unchecked, we know open2 has been processed. - auto blocks = node1.unchecked.get (node1.store.tx_begin_read (), open1->source ()); + ASSERT_TIMELY (10s, 1 == node1.unchecked.count ()); + // Get the next peer for attempting a tcp bootstrap connection + auto blocks = node1.unchecked.get (open1->source ()); ASSERT_EQ (blocks.size (), 1); } node1.block_processor.add (send1); // Waits for the send1 block to pass through block_processor and unchecked.put queues ASSERT_TIMELY (5s, node1.store.block.exists (node1.store.tx_begin_read (), open1->hash ())); - ASSERT_EQ (0, node1.unchecked.count (node1.store.tx_begin_read ())); + ASSERT_EQ (0, node1.unchecked.count ()); } TEST (ledger, unchecked_receive) @@ -4493,13 +4493,13 @@ TEST (ledger, unchecked_receive) node1.block_processor.add (send1); node1.block_processor.add (receive1); auto check_block_is_listed = [&] (nano::transaction const & transaction_a, nano::block_hash const & block_hash_a) { - return !node1.unchecked.get (transaction_a, block_hash_a).empty (); + return !node1.unchecked.get (block_hash_a).empty (); }; // Previous block for receive1 is unknown, signature cannot be validated { // Waits for the last blocks to pass through block_processor and unchecked.put queues ASSERT_TIMELY (15s, check_block_is_listed (node1.store.tx_begin_read (), receive1->previous ())); - auto blocks = node1.unchecked.get (node1.store.tx_begin_read (), receive1->previous ()); + auto blocks = node1.unchecked.get (receive1->previous ()); ASSERT_EQ (blocks.size (), 1); } // Waits for the open1 block to pass through block_processor and unchecked.put queues @@ -4508,12 +4508,12 @@ TEST (ledger, unchecked_receive) // Previous block for receive1 is known, signature was validated { auto transaction = node1.store.tx_begin_read (); - auto blocks (node1.unchecked.get (transaction, receive1->source ())); + auto blocks (node1.unchecked.get (receive1->source ())); ASSERT_EQ (blocks.size (), 1); } node1.block_processor.add (send2); ASSERT_TIMELY (10s, node1.store.block.exists (node1.store.tx_begin_read (), receive1->hash ())); - ASSERT_EQ (0, node1.unchecked.count (node1.store.tx_begin_read ())); + ASSERT_EQ (0, node1.unchecked.count ()); } TEST (ledger, confirmation_height_not_updated) @@ -5542,8 +5542,7 @@ TEST (ledger, migrate_lmdb_to_rocksdb) uint16_t port = 100; nano::lmdb::store store{ logger, path / "data.ldb", nano::dev::constants }; nano::unchecked_map unchecked{ store, system.stats, false }; - nano::stats stats{}; - nano::ledger ledger{ store, stats, nano::dev::constants }; + nano::ledger ledger{ store, system.stats, nano::dev::constants }; nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits::max () }; std::shared_ptr send = nano::state_block_builder () diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 7b541beb..d7d11e1c 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -2984,7 +2984,7 @@ TEST (node, block_processor_signatures) node1.process_active (receive2); node1.process_active (receive3); ASSERT_TIMELY (5s, node1.block (receive2->hash ()) != nullptr); // Implies send1, send2, send3, receive1. - ASSERT_TIMELY (5s, node1.unchecked.count (node1.store.tx_begin_read ()) == 0); + ASSERT_TIMELY (5s, node1.unchecked.count () == 0); ASSERT_EQ (nullptr, node1.block (receive3->hash ())); // Invalid signer ASSERT_EQ (nullptr, node1.block (send4->hash ())); // Invalid signature via process_active ASSERT_EQ (nullptr, node1.block (send5->hash ())); // Invalid signature via unchecked @@ -3289,17 +3289,17 @@ TEST (node, unchecked_cleanup) ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ())); node.process_active (open); // Waits for the open block to get saved in the database - ASSERT_TIMELY (15s, 1 == node.unchecked.count (node.store.tx_begin_read ())); + ASSERT_TIMELY (15s, 1 == node.unchecked.count ()); node.config.unchecked_cutoff_time = std::chrono::seconds (2); - ASSERT_EQ (1, node.unchecked.count (node.store.tx_begin_read ())); + ASSERT_EQ (1, node.unchecked.count ()); std::this_thread::sleep_for (std::chrono::seconds (1)); node.unchecked_cleanup (); ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ())); - ASSERT_EQ (1, node.unchecked.count (node.store.tx_begin_read ())); + ASSERT_EQ (1, node.unchecked.count ()); std::this_thread::sleep_for (std::chrono::seconds (2)); node.unchecked_cleanup (); ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ())); - ASSERT_EQ (0, node.unchecked.count (node.store.tx_begin_read ())); + ASSERT_EQ (0, node.unchecked.count ()); } /** This checks that a node can be opened (without being blocked) when a write lock is held elsewhere */ diff --git a/nano/core_test/unchecked_map.cpp b/nano/core_test/unchecked_map.cpp index ee7a3f63..c90f9d2b 100644 --- a/nano/core_test/unchecked_map.cpp +++ b/nano/core_test/unchecked_map.cpp @@ -73,18 +73,18 @@ TEST (block_store, one_bootstrap) .build_shared (); unchecked.put (block1->hash (), nano::unchecked_info{ block1 }); auto check_block_is_listed = [&] (nano::transaction const & transaction_a, nano::block_hash const & block_hash_a) { - return unchecked.get (transaction_a, block_hash_a).size () > 0; + return unchecked.get (block_hash_a).size () > 0; }; // Waits for the block1 to get saved in the database ASSERT_TIMELY (10s, check_block_is_listed (store->tx_begin_read (), block1->hash ())); auto transaction = store->tx_begin_read (); std::vector dependencies; - unchecked.for_each (transaction, [&dependencies] (nano::unchecked_key const & key, nano::unchecked_info const & info) { + unchecked.for_each ([&dependencies] (nano::unchecked_key const & key, nano::unchecked_info const & info) { dependencies.push_back (key.key ()); }); auto hash1 = dependencies[0]; ASSERT_EQ (block1->hash (), hash1); - auto blocks = unchecked.get (transaction, hash1); + auto blocks = unchecked.get (hash1); ASSERT_EQ (1, blocks.size ()); auto block2 = blocks[0].block; ASSERT_EQ (*block1, *block2); @@ -109,25 +109,24 @@ TEST (unchecked, simple) .work (5) .build_shared (); // Asserts the block wasn't added yet to the unchecked table - auto block_listing1 = unchecked.get (store->tx_begin_read (), block->previous ()); + auto block_listing1 = unchecked.get (block->previous ()); ASSERT_TRUE (block_listing1.empty ()); // Enqueues a block to be saved on the unchecked table unchecked.put (block->previous (), nano::unchecked_info (block)); // Waits for the block to get written in the database - auto check_block_is_listed = [&] (nano::transaction const & transaction_a, nano::block_hash const & block_hash_a) { - return unchecked.get (transaction_a, block_hash_a).size () > 0; + auto check_block_is_listed = [&] (nano::block_hash const & block_hash_a) { + return unchecked.get (block_hash_a).size () > 0; }; - ASSERT_TIMELY (5s, check_block_is_listed (store->tx_begin_read (), block->previous ())); - auto transaction = store->tx_begin_write (); + ASSERT_TIMELY (5s, check_block_is_listed (block->previous ())); // Retrieves the block from the database - auto block_listing2 = unchecked.get (transaction, block->previous ()); + auto block_listing2 = unchecked.get (block->previous ()); ASSERT_FALSE (block_listing2.empty ()); // Asserts the added block is equal to the retrieved one ASSERT_EQ (*block, *(block_listing2[0].block)); // Deletes the block from the database - unchecked.del (transaction, nano::unchecked_key (block->previous (), block->hash ())); + unchecked.del (nano::unchecked_key (block->previous (), block->hash ())); // Asserts the block is deleted - auto block_listing3 = unchecked.get (transaction, block->previous ()); + auto block_listing3 = unchecked.get (block->previous ()); ASSERT_TRUE (block_listing3.empty ()); } @@ -154,19 +153,19 @@ TEST (unchecked, multiple) .work (5) .build_shared (); // Asserts the block wasn't added yet to the unchecked table - auto block_listing1 = unchecked.get (store->tx_begin_read (), block->previous ()); + auto block_listing1 = unchecked.get (block->previous ()); ASSERT_TRUE (block_listing1.empty ()); // Enqueues the first block unchecked.put (block->previous (), nano::unchecked_info (block)); // Enqueues a second block unchecked.put (block->source (), nano::unchecked_info (block)); - auto check_block_is_listed = [&] (nano::transaction const & transaction_a, nano::block_hash const & block_hash_a) { - return unchecked.get (transaction_a, block_hash_a).size () > 0; + auto check_block_is_listed = [&] (nano::block_hash const & block_hash_a) { + return unchecked.get (block_hash_a).size () > 0; }; // Waits for and asserts the first block gets saved in the database - ASSERT_TIMELY (5s, check_block_is_listed (store->tx_begin_read (), block->previous ())); + ASSERT_TIMELY (5s, check_block_is_listed (block->previous ())); // Waits for and asserts the second block gets saved in the database - ASSERT_TIMELY (5s, check_block_is_listed (store->tx_begin_read (), block->source ())); + ASSERT_TIMELY (5s, check_block_is_listed (block->source ())); } // This test ensures that a block can't occur twice in the unchecked table. @@ -187,19 +186,19 @@ TEST (unchecked, double_put) .work (5) .build_shared (); // Asserts the block wasn't added yet to the unchecked table - auto block_listing1 = unchecked.get (store->tx_begin_read (), block->previous ()); + auto block_listing1 = unchecked.get (block->previous ()); ASSERT_TRUE (block_listing1.empty ()); // Enqueues the block to be saved in the unchecked table unchecked.put (block->previous (), nano::unchecked_info (block)); // Enqueues the block again in an attempt to have it there twice unchecked.put (block->previous (), nano::unchecked_info (block)); - auto check_block_is_listed = [&] (nano::transaction const & transaction_a, nano::block_hash const & block_hash_a) { - return unchecked.get (transaction_a, block_hash_a).size () > 0; + auto check_block_is_listed = [&] (nano::block_hash const & block_hash_a) { + return unchecked.get (block_hash_a).size () > 0; }; // Waits for and asserts the block was added at least once - ASSERT_TIMELY (5s, check_block_is_listed (store->tx_begin_read (), block->previous ())); + ASSERT_TIMELY (5s, check_block_is_listed (block->previous ())); // Asserts the block was added at most once -- this is objective of this test. - auto block_listing2 = unchecked.get (store->tx_begin_read (), block->previous ()); + auto block_listing2 = unchecked.get (block->previous ()); ASSERT_EQ (block_listing2.size (), 1); } @@ -251,8 +250,7 @@ TEST (unchecked, multiple_get) // we cannot trust the count() method if the backend is rocksdb auto count_unchecked_blocks_one_by_one = [&store, &unchecked] () { size_t count = 0; - auto transaction = store->tx_begin_read (); - unchecked.for_each (transaction, [&count] (nano::unchecked_key const & key, nano::unchecked_info const & info) { + unchecked.for_each ([&count] (nano::unchecked_key const & key, nano::unchecked_info const & info) { ++count; }); return count; @@ -263,8 +261,7 @@ TEST (unchecked, multiple_get) std::vector unchecked1; // Asserts the entries will be found for the provided key - auto transaction = store->tx_begin_read (); - auto unchecked1_blocks = unchecked.get (transaction, block1->previous ()); + auto unchecked1_blocks = unchecked.get (block1->previous ()); ASSERT_EQ (unchecked1_blocks.size (), 3); for (auto & i : unchecked1_blocks) { @@ -276,7 +273,7 @@ TEST (unchecked, multiple_get) ASSERT_TRUE (std::find (unchecked1.begin (), unchecked1.end (), block3->hash ()) != unchecked1.end ()); std::vector unchecked2; // Asserts the entries will be found for the provided key - auto unchecked2_blocks = unchecked.get (transaction, block1->hash ()); + auto unchecked2_blocks = unchecked.get (block1->hash ()); ASSERT_EQ (unchecked2_blocks.size (), 2); for (auto & i : unchecked2_blocks) { @@ -286,14 +283,14 @@ TEST (unchecked, multiple_get) ASSERT_TRUE (std::find (unchecked2.begin (), unchecked2.end (), block1->hash ()) != unchecked2.end ()); ASSERT_TRUE (std::find (unchecked2.begin (), unchecked2.end (), block2->hash ()) != unchecked2.end ()); // Asserts the entry is found by the key and the payload is saved - auto unchecked3 = unchecked.get (transaction, block2->previous ()); + auto unchecked3 = unchecked.get (block2->previous ()); ASSERT_EQ (unchecked3.size (), 1); ASSERT_EQ (unchecked3[0].block->hash (), block2->hash ()); // Asserts the entry is found by the key and the payload is saved - auto unchecked4 = unchecked.get (transaction, block3->hash ()); + auto unchecked4 = unchecked.get (block3->hash ()); ASSERT_EQ (unchecked4.size (), 1); ASSERT_EQ (unchecked4[0].block->hash (), block3->hash ()); // Asserts no entry is found for a block that wasn't added - auto unchecked5 = unchecked.get (transaction, block2->hash ()); + auto unchecked5 = unchecked.get (block2->hash ()); ASSERT_EQ (unchecked5.size (), 0); } diff --git a/nano/nano_node/entry.cpp b/nano/nano_node/entry.cpp index f04b2f55..e859d2b4 100644 --- a/nano/nano_node/entry.cpp +++ b/nano/nano_node/entry.cpp @@ -426,7 +426,7 @@ int main (int argc, char * const * argv) } // Check all unchecked keys for matching frontier hashes. Indicates an issue with process_batch algorithm - node->unchecked.for_each (transaction, [&frontier_hashes] (nano::unchecked_key const & key, nano::unchecked_info const & info) { + node->unchecked.for_each ([&frontier_hashes] (nano::unchecked_key const & key, nano::unchecked_info const & info) { auto it = frontier_hashes.find (key.key ()); if (it != frontier_hashes.cend ()) { @@ -999,7 +999,7 @@ int main (int argc, char * const * argv) if (timer_l.after_deadline (std::chrono::seconds (15))) { timer_l.restart (); - std::cout << boost::str (boost::format ("%1% (%2%) blocks processed (unchecked), %3% remaining") % node->ledger.cache.block_count % node->unchecked.count (node->store.tx_begin_read ()) % node->block_processor.size ()) << std::endl; + std::cout << boost::str (boost::format ("%1% (%2%) blocks processed (unchecked), %3% remaining") % node->ledger.cache.block_count % node->unchecked.count () % node->block_processor.size ()) << std::endl; } } @@ -1847,7 +1847,7 @@ int main (int argc, char * const * argv) if (timer_l.after_deadline (std::chrono::seconds (60))) { timer_l.restart (); - std::cout << boost::str (boost::format ("%1% (%2%) blocks processed (unchecked)") % node.node->ledger.cache.block_count % node.node->unchecked.count (node.node->store.tx_begin_read ())) << std::endl; + std::cout << boost::str (boost::format ("%1% (%2%) blocks processed (unchecked)") % node.node->ledger.cache.block_count % node.node->unchecked.count ()) << std::endl; } } diff --git a/nano/node/cli.cpp b/nano/node/cli.cpp index d780ccce..db25e74f 100644 --- a/nano/node/cli.cpp +++ b/nano/node/cli.cpp @@ -213,7 +213,7 @@ bool copy_database (boost::filesystem::path const & data_path, boost::program_op auto & store (node.node->store); if (vm.count ("unchecked_clear")) { - node.node->unchecked.clear (store.tx_begin_write ()); + node.node->unchecked.clear (); } if (vm.count ("clear_send_ids")) { @@ -491,7 +491,7 @@ std::error_code nano::handle_node_options (boost::program_options::variables_map if (!node.node->init_error ()) { auto transaction (node.node->store.tx_begin_write ()); - node.node->unchecked.clear (transaction); + node.node->unchecked.clear (); std::cout << "Unchecked blocks deleted" << std::endl; } else diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 20548def..fa6ac917 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -1422,7 +1422,7 @@ void nano::json_handler::block_account () void nano::json_handler::block_count () { response_l.put ("count", std::to_string (node.ledger.cache.block_count)); - response_l.put ("unchecked", std::to_string (node.unchecked.count (node.store.tx_begin_read ()))); + response_l.put ("unchecked", std::to_string (node.unchecked.count ())); response_l.put ("cemented", std::to_string (node.ledger.cache.cemented_count)); if (node.flags.enable_pruning) { @@ -4115,7 +4115,7 @@ void nano::json_handler::unchecked () boost::property_tree::ptree unchecked; auto transaction (node.store.tx_begin_read ()); node.unchecked.for_each ( - transaction, [&unchecked, &json_block_l] (nano::unchecked_key const & key, nano::unchecked_info const & info) { + [&unchecked, &json_block_l] (nano::unchecked_key const & key, nano::unchecked_info const & info) { if (json_block_l) { boost::property_tree::ptree block_node_l; @@ -4137,7 +4137,7 @@ void nano::json_handler::unchecked_clear () { node.workers.push_task (create_worker_task ([] (std::shared_ptr const & rpc_l) { auto transaction (rpc_l->node.store.tx_begin_write ({ tables::unchecked })); - rpc_l->node.unchecked.clear (transaction); + rpc_l->node.unchecked.clear (); rpc_l->response_l.put ("success", ""); rpc_l->response_errors (); })); @@ -4151,7 +4151,7 @@ void nano::json_handler::unchecked_get () { bool done = false; node.unchecked.for_each ( - node.store.tx_begin_read (), [&] (nano::unchecked_key const & key, nano::unchecked_info const & info) { + [&] (nano::unchecked_key const & key, nano::unchecked_info const & info) { if (key.hash == hash) { response_l.put ("modified_timestamp", std::to_string (info.modified ())); @@ -4196,7 +4196,8 @@ void nano::json_handler::unchecked_keys () boost::property_tree::ptree unchecked; auto transaction (node.store.tx_begin_read ()); node.unchecked.for_each ( - transaction, key, [&unchecked, json_block_l] (nano::unchecked_key const & key, nano::unchecked_info const & info) { + key, + [&unchecked, json_block_l] (nano::unchecked_key const & key, nano::unchecked_info const & info) { boost::property_tree::ptree entry; entry.put ("key", key.key ().to_string ()); entry.put ("hash", info.block->hash ().to_string ()); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index ea551db9..e20a4eb3 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -214,10 +214,9 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co block_publisher.connect (block_processor); gap_tracker.connect (block_processor); process_live_dispatcher.connect (block_processor); - unchecked.use_memory = [this] () { return ledger.bootstrap_weight_reached (); }; - unchecked.satisfied = [this] (nano::unchecked_info const & info) { + unchecked.satisfied.add ([this] (nano::unchecked_info const & info) { this->block_processor.add (info.block); - }; + }); inactive_vote_cache.rep_weight_query = [this] (nano::account const & rep) { return ledger.weight (rep); @@ -442,7 +441,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co if (!flags.disable_unchecked_drop && !use_bootstrap_weight && !flags.read_only) { auto const transaction (store.tx_begin_write ({ tables::unchecked })); - unchecked.clear (transaction); + unchecked.clear (); logger.always_log ("Dropping unchecked blocks"); } } @@ -971,7 +970,7 @@ void nano::node::unchecked_cleanup () auto const transaction (store.tx_begin_read ()); // Max 1M records to clean, max 2 minutes reading to prevent slow i/o systems issues unchecked.for_each ( - transaction, [this, &digests, &cleaning_list, &now] (nano::unchecked_key const & key, nano::unchecked_info const & info) { + [this, &digests, &cleaning_list, &now] (nano::unchecked_key const & key, nano::unchecked_info const & info) { if ((now - info.modified ()) > static_cast (config.unchecked_cutoff_time.count ())) { digests.push_back (network.publish_filter.hash (info.block)); @@ -991,9 +990,9 @@ void nano::node::unchecked_cleanup () { auto key (cleaning_list.front ()); cleaning_list.pop_front (); - if (unchecked.exists (transaction, key)) + if (unchecked.exists (key)) { - unchecked.del (transaction, key); + unchecked.del (key); } } } @@ -1497,7 +1496,7 @@ nano::telemetry_data nano::node::local_telemetry () const telemetry_data.bandwidth_cap = config.bandwidth_limit; telemetry_data.protocol_version = network_params.network.protocol_version; telemetry_data.uptime = std::chrono::duration_cast (std::chrono::steady_clock::now () - startup_time).count (); - telemetry_data.unchecked_count = unchecked.count (ledger.store.tx_begin_read ()); + telemetry_data.unchecked_count = unchecked.count (); telemetry_data.genesis_block = network_params.ledger.genesis->hash (); telemetry_data.peer_count = nano::narrow_cast (network.size ()); telemetry_data.account_count = ledger.cache.account_count; diff --git a/nano/node/unchecked_map.cpp b/nano/node/unchecked_map.cpp index 1a4ea823..d00def25 100644 --- a/nano/node/unchecked_map.cpp +++ b/nano/node/unchecked_map.cpp @@ -22,113 +22,66 @@ nano::unchecked_map::~unchecked_map () void nano::unchecked_map::put (nano::hash_or_account const & dependency, nano::unchecked_info const & info) { - nano::unique_lock lock{ mutex }; - buffer.push_back (std::make_pair (dependency, info)); - lock.unlock (); + nano::lock_guard lock{ entries_mutex }; + nano::unchecked_key key{ dependency, info.block->hash () }; + entries.get ().insert ({ key, info }); + if (entries.size () > mem_block_count_max) + { + entries.get ().pop_front (); + } stats.inc (nano::stat::type::unchecked, nano::stat::detail::put); - condition.notify_all (); // Notify run () } -void nano::unchecked_map::for_each ( -nano::transaction const & transaction, std::function action, std::function predicate) +void nano::unchecked_map::for_each (std::function action, std::function predicate) { nano::lock_guard lock{ entries_mutex }; - if (entries == nullptr) + for (auto i = entries.begin (), n = entries.end (); predicate () && i != n; ++i) { - for (auto [i, n] = store.unchecked.full_range (transaction); predicate () && i != n; ++i) - { - action (i->first, i->second); - } - } - else - { - for (auto i = entries->begin (), n = entries->end (); predicate () && i != n; ++i) - { - action (i->key, i->info); - } + action (i->key, i->info); } } -void nano::unchecked_map::for_each ( -nano::transaction const & transaction, nano::hash_or_account const & dependency, std::function action, std::function predicate) +void nano::unchecked_map::for_each (nano::hash_or_account const & dependency, std::function action, std::function predicate) { nano::lock_guard lock{ entries_mutex }; - if (entries == nullptr) + for (auto i = entries.template get ().lower_bound (nano::unchecked_key{ dependency, 0 }), n = entries.template get ().end (); predicate () && i != n && i->key.key () == dependency.as_block_hash (); ++i) { - for (auto [i, n] = store.unchecked.equal_range (transaction, dependency.as_block_hash ()); predicate () && i->first.key () == dependency.as_block_hash () && i != n; ++i) - { - action (i->first, i->second); - } - } - else - { - for (auto i = entries->template get ().lower_bound (nano::unchecked_key{ dependency, 0 }), n = entries->template get ().end (); predicate () && i != n && i->key.key () == dependency.as_block_hash (); ++i) - { - action (i->key, i->info); - } + action (i->key, i->info); } } -std::vector nano::unchecked_map::get (nano::transaction const & transaction, nano::block_hash const & hash) +std::vector nano::unchecked_map::get (nano::block_hash const & hash) { std::vector result; - for_each (transaction, hash, [&result] (nano::unchecked_key const & key, nano::unchecked_info const & info) { + for_each (hash, [&result] (nano::unchecked_key const & key, nano::unchecked_info const & info) { result.push_back (info); }); return result; } -bool nano::unchecked_map::exists (nano::transaction const & transaction, nano::unchecked_key const & key) const +bool nano::unchecked_map::exists (nano::unchecked_key const & key) const { nano::lock_guard lock{ entries_mutex }; - if (entries == nullptr) - { - return store.unchecked.exists (transaction, key); - } - else - { - return entries->template get ().count (key) != 0; - } + return entries.get ().count (key) != 0; } -void nano::unchecked_map::del (nano::write_transaction const & transaction, nano::unchecked_key const & key) +void nano::unchecked_map::del (nano::unchecked_key const & key) { nano::lock_guard lock{ entries_mutex }; - if (entries == nullptr) - { - store.unchecked.del (transaction, key); - } - else - { - auto erased = entries->template get ().erase (key); - release_assert (erased); - } + auto erased = entries.get ().erase (key); + debug_assert (erased); } -void nano::unchecked_map::clear (nano::write_transaction const & transaction) +void nano::unchecked_map::clear () { nano::lock_guard lock{ entries_mutex }; - if (entries == nullptr) - { - store.unchecked.clear (transaction); - } - else - { - entries->clear (); - } + entries.clear (); } -size_t nano::unchecked_map::count (nano::transaction const & transaction) const +std::size_t nano::unchecked_map::count () const { nano::lock_guard lock{ entries_mutex }; - if (entries == nullptr) - { - return store.unchecked.count (transaction); - } - else - { - return entries->size (); - } + return entries.size (); } void nano::unchecked_map::stop () @@ -152,37 +105,18 @@ void nano::unchecked_map::flush () void nano::unchecked_map::trigger (nano::hash_or_account const & dependency) { nano::unique_lock lock{ mutex }; - buffer.push_back (dependency); - debug_assert (buffer.back ().which () == 1); // which stands for "query". + buffer.emplace_back (dependency); lock.unlock (); stats.inc (nano::stat::type::unchecked, nano::stat::detail::trigger); condition.notify_all (); // Notify run () + stats.inc (nano::stat::type::unchecked, nano::stat::detail::trigger); } -nano::unchecked_map::item_visitor::item_visitor (unchecked_map & unchecked, nano::write_transaction const & transaction) : - unchecked{ unchecked }, - transaction{ transaction } +void nano::unchecked_map::process_queries (decltype (buffer) const & back_buffer) { -} - -void nano::unchecked_map::item_visitor::operator() (insert const & item) -{ - auto const & [dependency, info] = item; - unchecked.insert_impl (transaction, dependency, info); -} - -void nano::unchecked_map::item_visitor::operator() (query const & item) -{ - unchecked.query_impl (transaction, item.hash); -} - -void nano::unchecked_map::write_buffer (decltype (buffer) const & back_buffer) -{ - auto transaction = store.tx_begin_write (); - item_visitor visitor{ *this, transaction }; for (auto const & item : back_buffer) { - boost::apply_visitor (visitor, item); + query_impl (item.hash); } } @@ -197,7 +131,7 @@ void nano::unchecked_map::run () back_buffer.swap (buffer); writing_back_buffer = true; lock.unlock (); - write_buffer (back_buffer); + process_queries (back_buffer); lock.lock (); writing_back_buffer = false; back_buffer.clear (); @@ -212,54 +146,29 @@ void nano::unchecked_map::run () } } -void nano::unchecked_map::insert_impl (nano::write_transaction const & transaction, nano::hash_or_account const & dependency, nano::unchecked_info const & info) +void nano::unchecked_map::query_impl (nano::block_hash const & hash) { - // Check if block dependency has been satisfied while waiting to be placed in the unchecked map - if (store.block.exists (transaction, dependency.as_block_hash ())) - { - satisfied (info); - return; - } - nano::lock_guard lock{ entries_mutex }; - // Check if we should be using memory but the memory container hasn't been constructed i.e. we're transitioning from disk to memory. - if (entries == nullptr && use_memory ()) - { - auto entries_new = std::make_unique (); - for_each ( - transaction, [&entries_new] (nano::unchecked_key const & key, nano::unchecked_info const & info) { entries_new->template get ().insert ({ key, info }); }, [&] () { return entries_new->size () < mem_block_count_max; }); - clear (transaction); - entries = std::move (entries_new); - } - if (entries == nullptr) - { - store.unchecked.put (transaction, dependency, info); - } - else - { - nano::unchecked_key key{ dependency, info.block->hash () }; - entries->template get ().insert ({ key, info }); - while (entries->size () > mem_block_count_max) - { - entries->template get ().pop_front (); - } - } -} - -void nano::unchecked_map::query_impl (nano::write_transaction const & transaction, nano::block_hash const & hash) -{ - nano::lock_guard lock{ entries_mutex }; - std::deque delete_queue; - for_each (transaction, hash, [this, &delete_queue] (nano::unchecked_key const & key, nano::unchecked_info const & info) { + for_each (hash, [this, &delete_queue] (nano::unchecked_key const & key, nano::unchecked_info const & info) { delete_queue.push_back (key); stats.inc (nano::stat::type::unchecked, nano::stat::detail::satisfied); - satisfied (info); + satisfied.notify (info); }); if (!disable_delete) { for (auto const & key : delete_queue) { - del (transaction, key); + del (key); } } } + +std::unique_ptr nano::unchecked_map::collect_container_info (const std::string & name) +{ + nano::lock_guard lock{ mutex }; + + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "entries", entries.size (), sizeof (decltype (entries)::value_type) })); + composite->add_component (std::make_unique (container_info{ "queries", buffer.size (), sizeof (decltype (buffer)::value_type) })); + return composite; +} diff --git a/nano/node/unchecked_map.hpp b/nano/node/unchecked_map.hpp index 3042cdc6..cfcb8b59 100644 --- a/nano/node/unchecked_map.hpp +++ b/nano/node/unchecked_map.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -31,60 +32,50 @@ public: void put (nano::hash_or_account const & dependency, nano::unchecked_info const & info); void for_each ( - nano::transaction const & transaction, std::function action, std::function predicate = [] () { return true; }); + std::function action, std::function predicate = [] () { return true; }); void for_each ( - nano::transaction const & transaction, nano::hash_or_account const & dependency, std::function action, std::function predicate = [] () { return true; }); - std::vector get (nano::transaction const &, nano::block_hash const &); - bool exists (nano::transaction const & transaction, nano::unchecked_key const & key) const; - void del (nano::write_transaction const & transaction, nano::unchecked_key const & key); - void clear (nano::write_transaction const & transaction); - size_t count (nano::transaction const & transaction) const; + nano::hash_or_account const & dependency, std::function action, std::function predicate = [] () { return true; }); + std::vector get (nano::block_hash const &); + bool exists (nano::unchecked_key const & key) const; + void del (nano::unchecked_key const & key); + void clear (); + std::size_t count () const; void stop (); void flush (); - std::function use_memory = [] () { return true; }; - -public: // Trigger requested dependencies + /** + * Trigger requested dependencies + */ void trigger (nano::hash_or_account const & dependency); - std::function satisfied{ [] (nano::unchecked_info const &) {} }; + +public: // Events + nano::observer_set satisfied; private: - using insert = std::pair; - using query = nano::hash_or_account; - class item_visitor : boost::static_visitor<> - { - public: - item_visitor (unchecked_map & unchecked, nano::write_transaction const & transaction); - void operator() (insert const & item); - void operator() (query const & item); - unchecked_map & unchecked; - nano::write_transaction const & transaction; - }; void run (); - void insert_impl (nano::write_transaction const & transaction, nano::hash_or_account const & dependency, nano::unchecked_info const & info); - void query_impl (nano::write_transaction const & transaction, nano::block_hash const & hash); + void query_impl (nano::block_hash const & hash); + +private: // Dependencies nano::store & store; nano::stats & stats; private: bool const & disable_delete; - std::deque> buffer; - std::deque> back_buffer; + std::deque buffer; + std::deque back_buffer; bool writing_back_buffer{ false }; bool stopped{ false }; nano::condition_variable condition; nano::mutex mutex; std::thread thread; - void write_buffer (decltype (buffer) const & back_buffer); - static size_t constexpr mem_block_count_max = 256'000; + void process_queries (decltype (buffer) const & back_buffer); - friend class item_visitor; + static std::size_t constexpr mem_block_count_max = 64 * 1024; -private: // In memory store - class entry +private: + struct entry { - public: nano::unchecked_key key; nano::unchecked_info info; }; @@ -99,8 +90,11 @@ private: // In memory store mi::ordered_unique, mi::member>>>; // clang-format on - std::unique_ptr entries; + ordered_unchecked entries; mutable std::recursive_mutex entries_mutex; + +public: // Container info + std::unique_ptr collect_container_info (std::string const & name); }; } diff --git a/nano/qt/qt.cpp b/nano/qt/qt.cpp index 0f5951b2..457d57c7 100644 --- a/nano/qt/qt.cpp +++ b/nano/qt/qt.cpp @@ -923,7 +923,7 @@ std::string nano_qt::status::text () std::string count_string; { auto size (wallet.wallet_m->wallets.node.ledger.cache.block_count.load ()); - unchecked = wallet.wallet_m->wallets.node.unchecked.count (wallet.wallet_m->wallets.node.store.tx_begin_read ()); + unchecked = wallet.wallet_m->wallets.node.unchecked.count (); cemented = wallet.wallet_m->wallets.node.ledger.cache.cemented_count.load (); count_string = std::to_string (size); } diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index d247645d..fb11dd69 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -6094,7 +6094,7 @@ TEST (rpc, unchecked) node->process_active (open); node->process_active (open2); // Waits for the last block of the queue to get saved in the database - ASSERT_TIMELY (10s, 2 == node->unchecked.count (node->store.tx_begin_read ())); + ASSERT_TIMELY (10s, 2 == node->unchecked.count ()); boost::property_tree::ptree request; request.put ("action", "unchecked"); request.put ("count", 2); @@ -6135,7 +6135,7 @@ TEST (rpc, unchecked_get) .build_shared (); node->process_active (open); // Waits for the open block to get saved in the database - ASSERT_TIMELY (10s, 1 == node->unchecked.count (node->store.tx_begin_read ())); + ASSERT_TIMELY (10s, 1 == node->unchecked.count ()); boost::property_tree::ptree request{}; request.put ("action", "unchecked_get"); request.put ("hash", open->hash ().to_string ()); @@ -6175,12 +6175,12 @@ TEST (rpc, unchecked_clear) node->process_active (open); boost::property_tree::ptree request{}; // Waits for the open block to get saved in the database - ASSERT_TIMELY (10s, 1 == node->unchecked.count (node->store.tx_begin_read ())); + ASSERT_TIMELY (10s, 1 == node->unchecked.count ()); request.put ("action", "unchecked_clear"); auto response = wait_response (system, rpc_ctx, request); // Waits for the open block to get saved in the database - ASSERT_TIMELY (10s, 0 == node->unchecked.count (node->store.tx_begin_read ())); + ASSERT_TIMELY (10s, 0 == node->unchecked.count ()); } TEST (rpc, unopened) diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index f6fedfab..9200416a 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -510,7 +510,7 @@ TEST (store, unchecked_load) node.unchecked.put (i, block); } // Waits for all the blocks to get saved in the database - ASSERT_TIMELY (8000s, num_unchecked == node.unchecked.count (node.store.tx_begin_read ())); + ASSERT_TIMELY (8000s, num_unchecked == node.unchecked.count ()); } TEST (store, vote_load) From d8d4657d789f8ff5042769275654f4f478cefd2a Mon Sep 17 00:00:00 2001 From: Colin LeMahieu Date: Sun, 5 Mar 2023 14:44:13 +0000 Subject: [PATCH 2/5] This adds the ascending bootstrap client functionality. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Pulls blocks in an ascending order making use of network messages added in v24. - Since blocks need to be inserted in an ascending order, this greatly decreases the number of unchecked blocks needing to be tracked. - The client uses a priority-weighted tracing algorithm to pull successive iterations of blocks from peers. Co-authored-by: Piotr Wójcik <3044353+pwojcikdev@users.noreply.github.com> Co-authored-by: gr0vity-dev <85646666+gr0vity-dev@users.noreply.github.com> --- nano/core_test/CMakeLists.txt | 1 + nano/core_test/bootstrap_ascending.cpp | 252 ++++++ nano/lib/config.hpp | 2 + nano/lib/stats_enums.hpp | 55 ++ nano/lib/threading.cpp | 3 + nano/lib/threading.hpp | 3 + nano/node/CMakeLists.txt | 2 + nano/node/bootstrap/bootstrap.hpp | 3 +- nano/node/bootstrap/bootstrap_ascending.cpp | 855 ++++++++++++++++++++ nano/node/bootstrap/bootstrap_ascending.hpp | 329 ++++++++ nano/node/bootstrap/bootstrap_attempt.cpp | 2 + 11 files changed, 1506 insertions(+), 1 deletion(-) create mode 100644 nano/core_test/bootstrap_ascending.cpp create mode 100644 nano/node/bootstrap/bootstrap_ascending.cpp create mode 100644 nano/node/bootstrap/bootstrap_ascending.hpp diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index dcd6734d..7c891609 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -9,6 +9,7 @@ add_executable( block_store.cpp blockprocessor.cpp bootstrap.cpp + bootstrap_ascending.cpp bootstrap_server.cpp cli.cpp confirmation_height.cpp diff --git a/nano/core_test/bootstrap_ascending.cpp b/nano/core_test/bootstrap_ascending.cpp new file mode 100644 index 00000000..6eba2795 --- /dev/null +++ b/nano/core_test/bootstrap_ascending.cpp @@ -0,0 +1,252 @@ +#include +#include +#include +#include + +#include + +using namespace std::chrono_literals; + +namespace +{ +nano::block_hash random_hash () +{ + nano::block_hash random_hash; + nano::random_pool::generate_block (random_hash.bytes.data (), random_hash.bytes.size ()); + return random_hash; +} +} + +TEST (account_sets, construction) +{ + nano::stats stats; + nano::logger_mt logger; + auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); + ASSERT_FALSE (store->init_error ()); + nano::bootstrap_ascending::account_sets sets{ stats }; +} + +TEST (account_sets, empty_blocked) +{ + nano::account account{ 1 }; + nano::stats stats; + nano::logger_mt logger; + auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); + ASSERT_FALSE (store->init_error ()); + nano::bootstrap_ascending::account_sets sets{ stats }; + ASSERT_FALSE (sets.blocked (account)); +} + +TEST (account_sets, block) +{ + nano::account account{ 1 }; + nano::stats stats; + nano::logger_mt logger; + auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); + ASSERT_FALSE (store->init_error ()); + nano::bootstrap_ascending::account_sets sets{ stats }; + sets.block (account, random_hash ()); + ASSERT_TRUE (sets.blocked (account)); +} + +TEST (account_sets, unblock) +{ + nano::account account{ 1 }; + nano::stats stats; + nano::logger_mt logger; + auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); + ASSERT_FALSE (store->init_error ()); + nano::bootstrap_ascending::account_sets sets{ stats }; + auto hash = random_hash (); + sets.block (account, hash); + sets.unblock (account, hash); + ASSERT_FALSE (sets.blocked (account)); +} + +TEST (account_sets, priority_base) +{ + nano::account account{ 1 }; + nano::stats stats; + nano::logger_mt logger; + auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); + ASSERT_FALSE (store->init_error ()); + nano::bootstrap_ascending::account_sets sets{ stats }; + ASSERT_EQ (1.0f, sets.priority (account)); +} + +TEST (account_sets, priority_blocked) +{ + nano::account account{ 1 }; + nano::stats stats; + nano::logger_mt logger; + auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); + ASSERT_FALSE (store->init_error ()); + nano::bootstrap_ascending::account_sets sets{ stats }; + sets.block (account, random_hash ()); + ASSERT_EQ (0.0f, sets.priority (account)); +} + +// When account is unblocked, check that it retains it former priority +TEST (account_sets, priority_unblock_keep) +{ + nano::account account{ 1 }; + nano::stats stats; + nano::logger_mt logger; + auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); + ASSERT_FALSE (store->init_error ()); + nano::bootstrap_ascending::account_sets sets{ stats }; + sets.priority_up (account); + sets.priority_up (account); + ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial * nano::bootstrap_ascending::account_sets::priority_increase); + auto hash = random_hash (); + sets.block (account, hash); + ASSERT_EQ (0.0f, sets.priority (account)); + sets.unblock (account, hash); + ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial * nano::bootstrap_ascending::account_sets::priority_increase); +} + +TEST (account_sets, priority_up_down) +{ + nano::account account{ 1 }; + nano::stats stats; + nano::logger_mt logger; + auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); + ASSERT_FALSE (store->init_error ()); + nano::bootstrap_ascending::account_sets sets{ stats }; + sets.priority_up (account); + ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial); + sets.priority_down (account); + ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial - nano::bootstrap_ascending::account_sets::priority_decrease); +} + +// Check that priority downward saturates to 1.0f +TEST (account_sets, priority_down_sat) +{ + nano::account account{ 1 }; + nano::stats stats; + nano::logger_mt logger; + auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); + ASSERT_FALSE (store->init_error ()); + nano::bootstrap_ascending::account_sets sets{ stats }; + sets.priority_down (account); + ASSERT_EQ (1.0f, sets.priority (account)); +} + +// Ensure priority value is bounded +TEST (account_sets, saturate_priority) +{ + nano::account account{ 1 }; + nano::stats stats; + nano::logger_mt logger; + auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); + ASSERT_FALSE (store->init_error ()); + nano::bootstrap_ascending::account_sets sets{ stats }; + for (int n = 0; n < 1000; ++n) + { + sets.priority_up (account); + } + ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_max); +} + +/** + * Tests the base case for returning + */ +TEST (bootstrap_ascending, account_base) +{ + nano::node_flags flags; + nano::test::system system{ 1, nano::transport::transport_type::tcp, flags }; + auto & node0 = *system.nodes[0]; + nano::state_block_builder builder; + auto send1 = builder.make_block () + .account (nano::dev::genesis_key.pub) + .previous (nano::dev::genesis->hash ()) + .representative (nano::dev::genesis_key.pub) + .link (0) + .balance (nano::dev::constants.genesis_amount - 1) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (nano::dev::genesis->hash ())) + .build_shared (); + ASSERT_EQ (nano::process_result::progress, node0.process (*send1).code); + auto & node1 = *system.add_node (flags); + ASSERT_TIMELY (5s, node1.block (send1->hash ()) != nullptr); +} + +/** + * Tests that bootstrap_ascending will return multiple new blocks in-order + */ +TEST (bootstrap_ascending, account_inductive) +{ + nano::node_flags flags; + nano::test::system system{ 1, nano::transport::transport_type::tcp, flags }; + auto & node0 = *system.nodes[0]; + nano::state_block_builder builder; + auto send1 = builder.make_block () + .account (nano::dev::genesis_key.pub) + .previous (nano::dev::genesis->hash ()) + .representative (nano::dev::genesis_key.pub) + .link (0) + .balance (nano::dev::constants.genesis_amount - 1) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (nano::dev::genesis->hash ())) + .build_shared (); + auto send2 = builder.make_block () + .account (nano::dev::genesis_key.pub) + .previous (send1->hash ()) + .representative (nano::dev::genesis_key.pub) + .link (0) + .balance (nano::dev::constants.genesis_amount - 2) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (send1->hash ())) + .build_shared (); + // std::cerr << "Genesis: " << nano::dev::genesis->hash ().to_string () << std::endl; + // std::cerr << "Send1: " << send1->hash ().to_string () << std::endl; + // std::cerr << "Send2: " << send2->hash ().to_string () << std::endl; + ASSERT_EQ (nano::process_result::progress, node0.process (*send1).code); + ASSERT_EQ (nano::process_result::progress, node0.process (*send2).code); + auto & node1 = *system.add_node (flags); + ASSERT_TIMELY (50s, node1.block (send2->hash ()) != nullptr); +} + +/** + * Tests that bootstrap_ascending will return multiple new blocks in-order + */ +TEST (bootstrap_ascending, trace_base) +{ + nano::node_flags flags; + flags.disable_legacy_bootstrap = true; + nano::test::system system{ 1, nano::transport::transport_type::tcp, flags }; + auto & node0 = *system.nodes[0]; + nano::keypair key; + nano::state_block_builder builder; + auto send1 = builder.make_block () + .account (nano::dev::genesis_key.pub) + .previous (nano::dev::genesis->hash ()) + .representative (nano::dev::genesis_key.pub) + .link (key.pub) + .balance (nano::dev::constants.genesis_amount - 1) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (nano::dev::genesis->hash ())) + .build_shared (); + auto receive1 = builder.make_block () + .account (key.pub) + .previous (0) + .representative (nano::dev::genesis_key.pub) + .link (send1->hash ()) + .balance (1) + .sign (key.prv, key.pub) + .work (*system.work.generate (key.pub)) + .build_shared (); + // std::cerr << "Genesis key: " << nano::dev::genesis_key.pub.to_account () << std::endl; + // std::cerr << "Key: " << key.pub.to_account () << std::endl; + // std::cerr << "Genesis: " << nano::dev::genesis->hash ().to_string () << std::endl; + // std::cerr << "send1: " << send1->hash ().to_string () << std::endl; + // std::cerr << "receive1: " << receive1->hash ().to_string () << std::endl; + auto & node1 = *system.add_node (); + // std::cerr << "--------------- Start ---------------\n"; + ASSERT_EQ (nano::process_result::progress, node0.process (*send1).code); + ASSERT_EQ (nano::process_result::progress, node0.process (*receive1).code); + ASSERT_EQ (node1.store.pending.begin (node1.store.tx_begin_read (), nano::pending_key{ key.pub, 0 }), node1.store.pending.end ()); + // std::cerr << "node0: " << node0.network.endpoint () << std::endl; + // std::cerr << "node1: " << node1.network.endpoint () << std::endl; + ASSERT_TIMELY (10s, node1.block (receive1->hash ()) != nullptr); +} diff --git a/nano/lib/config.hpp b/nano/lib/config.hpp index 6e785bfd..c4961c87 100644 --- a/nano/lib/config.hpp +++ b/nano/lib/config.hpp @@ -383,6 +383,8 @@ public: uint8_t const protocol_version = 0x13; /** Minimum accepted protocol version */ uint8_t const protocol_version_min = 0x12; + /** Minimum accepted protocol version used when bootstrapping */ + uint8_t const bootstrap_protocol_version_min = 0x13; }; std::string get_node_toml_config_path (boost::filesystem::path const & data_path); diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index f69f1b21..d9253a9c 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -45,6 +45,13 @@ enum class type : uint8_t optimistic_scheduler, handshake, + bootstrap_server_requests, + bootstrap_server_responses, + bootstrap_ascending, + bootstrap_ascending_connections, + bootstrap_ascending_thread, + bootstrap_ascending_accounts, + _last // Must be the last enum }; @@ -280,6 +287,54 @@ enum class detail : uint8_t missing_cookie, invalid_genesis, + // bootstrap ascending + missing_tag, + reply, + track, + timeout, + nothing_new, + + // bootstrap ascending connections + connect, + connect_missing, + connect_failed, + connect_success, + reuse, + + // bootstrap ascending thread + read_block, + read_block_done, + read_block_end, + read_block_error, + + // bootstrap ascending accounts + prioritize, + prioritize_failed, + block, + unblock, + unblock_failed, + + next_priority, + next_database, + next_none, + + blocking_insert, + blocking_erase_overflow, + priority_insert, + priority_erase_threshold, + priority_erase_block, + priority_erase_overflow, + deprioritize, + deprioritize_failed, + + // active + started_hinted, + started_optimistic, + + // optimistic + pop_gap, + pop_leaf, + _last // Must be the last enum }; diff --git a/nano/lib/threading.cpp b/nano/lib/threading.cpp index d857c9e9..69867033 100644 --- a/nano/lib/threading.cpp +++ b/nano/lib/threading.cpp @@ -102,6 +102,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::vote_generator_queue: thread_role_name_string = "Voting que"; break; + case nano::thread_role::name::ascending_bootstrap: + thread_role_name_string = "Bootstrap asc"; + break; case nano::thread_role::name::bootstrap_server: thread_role_name_string = "Bootstrap serv"; break; diff --git a/nano/lib/threading.hpp b/nano/lib/threading.hpp index f8724166..1aaab00f 100644 --- a/nano/lib/threading.hpp +++ b/nano/lib/threading.hpp @@ -49,6 +49,9 @@ namespace thread_role bootstrap_server, telemetry, optimistic_scheduler, + ascending_bootstrap, + bootstrap_server_requests, + bootstrap_server_responses, }; /* diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 9289b863..20a0b518 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -34,6 +34,8 @@ add_library( blockprocessor.cpp bootstrap/block_deserializer.hpp bootstrap/block_deserializer.cpp + bootstrap/bootstrap_ascending.hpp + bootstrap/bootstrap_ascending.cpp bootstrap/bootstrap_attempt.hpp bootstrap/bootstrap_attempt.cpp bootstrap/bootstrap_bulk_pull.hpp diff --git a/nano/node/bootstrap/bootstrap.hpp b/nano/node/bootstrap/bootstrap.hpp index 66852b78..5b6f62ef 100644 --- a/nano/node/bootstrap/bootstrap.hpp +++ b/nano/node/bootstrap/bootstrap.hpp @@ -27,7 +27,8 @@ enum class bootstrap_mode { legacy, lazy, - wallet_lazy + wallet_lazy, + ascending }; enum class sync_result { diff --git a/nano/node/bootstrap/bootstrap_ascending.cpp b/nano/node/bootstrap/bootstrap_ascending.cpp new file mode 100644 index 00000000..9cefe080 --- /dev/null +++ b/nano/node/bootstrap/bootstrap_ascending.cpp @@ -0,0 +1,855 @@ +#include +#include +#include +#include +#include +#include + +#include + +using namespace std::chrono_literals; + +/* + * database_iterator + */ + +nano::bootstrap_ascending::database_iterator::database_iterator (nano::store & store_a, table_type table_a) : + store{ store_a }, + table{ table_a } +{ +} + +nano::account nano::bootstrap_ascending::database_iterator::operator* () const +{ + return current; +} + +void nano::bootstrap_ascending::database_iterator::next (nano::transaction & tx) +{ + switch (table) + { + case table_type::account: + { + auto i = current.number () + 1; + auto item = store.account.begin (tx, i); + if (item != store.account.end ()) + { + current = item->first; + } + else + { + current = { 0 }; + } + break; + } + case table_type::pending: + { + auto i = current.number () + 1; + auto item = store.pending.begin (tx, nano::pending_key{ i, 0 }); + if (item != store.pending.end ()) + { + current = item->first.account; + } + else + { + current = { 0 }; + } + break; + } + } +} + +/* + * buffered_iterator + */ + +nano::bootstrap_ascending::buffered_iterator::buffered_iterator (nano::store & store_a) : + store{ store_a }, + accounts_iterator{ store, database_iterator::table_type::account }, + pending_iterator{ store, database_iterator::table_type::pending } +{ +} + +nano::account nano::bootstrap_ascending::buffered_iterator::operator* () const +{ + return !buffer.empty () ? buffer.front () : nano::account{ 0 }; +} + +nano::account nano::bootstrap_ascending::buffered_iterator::next () +{ + if (!buffer.empty ()) + { + buffer.pop_front (); + } + else + { + fill (); + } + + return *(*this); +} + +void nano::bootstrap_ascending::buffered_iterator::fill () +{ + debug_assert (buffer.empty ()); + + // Fill half from accounts table and half from pending table + auto transaction = store.tx_begin_read (); + + for (int n = 0; n < size / 2; ++n) + { + accounts_iterator.next (transaction); + if (!(*accounts_iterator).is_zero ()) + { + buffer.push_back (*accounts_iterator); + } + } + + for (int n = 0; n < size / 2; ++n) + { + pending_iterator.next (transaction); + if (!(*pending_iterator).is_zero ()) + { + buffer.push_back (*pending_iterator); + } + } +} + +/* + * account_sets + */ + +nano::bootstrap_ascending::account_sets::account_sets (nano::stats & stats_a) : + stats{ stats_a } +{ +} + +void nano::bootstrap_ascending::account_sets::priority_up (nano::account const & account) +{ + if (!blocked (account)) + { + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::prioritize); + + auto iter = priorities.get ().find (account); + if (iter != priorities.get ().end ()) + { + priorities.get ().modify (iter, [] (auto & val) { + val.priority = std::min ((val.priority * account_sets::priority_increase), account_sets::priority_max); + }); + } + else + { + priorities.get ().insert ({ account, account_sets::priority_initial }); + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_insert); + + trim_overflow (); + } + } + else + { + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::prioritize_failed); + } +} + +void nano::bootstrap_ascending::account_sets::priority_down (nano::account const & account) +{ + auto iter = priorities.get ().find (account); + if (iter != priorities.get ().end ()) + { + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::deprioritize); + + auto priority_new = iter->priority - account_sets::priority_decrease; + if (priority_new <= account_sets::priority_cutoff) + { + priorities.get ().erase (iter); + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_threshold); + } + else + { + priorities.get ().modify (iter, [priority_new] (auto & val) { + val.priority = priority_new; + }); + } + } + else + { + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::deprioritize_failed); + } +} + +void nano::bootstrap_ascending::account_sets::block (nano::account const & account, nano::block_hash const & dependency) +{ + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::block); + + auto existing = priorities.get ().find (account); + auto entry = existing == priorities.get ().end () ? priority_entry{ 0, 0 } : *existing; + + priorities.get ().erase (account); + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_block); + + blocking.get ().insert ({ account, dependency, entry }); + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::blocking_insert); + + trim_overflow (); +} + +void nano::bootstrap_ascending::account_sets::unblock (nano::account const & account, std::optional const & hash) +{ + // Unblock only if the dependency is fulfilled + auto existing = blocking.get ().find (account); + if (existing != blocking.get ().end () && (!hash || existing->dependency == *hash)) + { + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::unblock); + + debug_assert (priorities.get ().count (account) == 0); + if (!existing->original_entry.account.is_zero ()) + { + debug_assert (existing->original_entry.account == account); + priorities.get ().insert (existing->original_entry); + } + else + { + priorities.get ().insert ({ account, account_sets::priority_initial }); + } + blocking.get ().erase (account); + + trim_overflow (); + } + else + { + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::unblock_failed); + } +} + +void nano::bootstrap_ascending::account_sets::timestamp (const nano::account & account, bool reset) +{ + const nano::millis_t tstamp = reset ? 0 : nano::milliseconds_since_epoch (); + + auto iter = priorities.get ().find (account); + if (iter != priorities.get ().end ()) + { + priorities.get ().modify (iter, [tstamp] (auto & entry) { + entry.timestamp = tstamp; + }); + } +} + +bool nano::bootstrap_ascending::account_sets::check_timestamp (const nano::account & account) const +{ + auto iter = priorities.get ().find (account); + if (iter != priorities.get ().end ()) + { + if (nano::milliseconds_since_epoch () - iter->timestamp < cooldown) + { + return false; + } + } + return true; +} + +void nano::bootstrap_ascending::account_sets::trim_overflow () +{ + if (priorities.size () > priorities_max) + { + // Evict the lowest priority entry + priorities.get ().erase (priorities.get ().begin ()); + + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_overflow); + } + if (blocking.size () > blocking_max) + { + // Evict the lowest priority entry + blocking.get ().erase (blocking.get ().begin ()); + + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::blocking_erase_overflow); + } +} + +nano::account nano::bootstrap_ascending::account_sets::next () +{ + if (priorities.empty ()) + { + return { 0 }; + } + + std::vector weights; + std::vector candidates; + + int iterations = 0; + while (candidates.size () < account_sets::consideration_count && iterations++ < account_sets::consideration_count * 10) + { + debug_assert (candidates.size () == weights.size ()); + + // Use a dedicated, uniformly distributed field for sampling to avoid problematic corner case when accounts in the queue are very close together + auto search = bootstrap_ascending::generate_id (); + auto iter = priorities.get ().lower_bound (search); + if (iter == priorities.get ().end ()) + { + iter = priorities.get ().begin (); + } + + if (check_timestamp (iter->account)) + { + candidates.push_back (iter->account); + weights.push_back (iter->priority); + } + } + + if (candidates.empty ()) + { + return { 0 }; // All sampled accounts are busy + } + + std::discrete_distribution dist{ weights.begin (), weights.end () }; + auto selection = dist (rng); + debug_assert (!weights.empty () && selection < weights.size ()); + auto result = candidates[selection]; + return result; +} + +bool nano::bootstrap_ascending::account_sets::blocked (nano::account const & account) const +{ + return blocking.get ().count (account) > 0; +} + +std::size_t nano::bootstrap_ascending::account_sets::priority_size () const +{ + return priorities.size (); +} + +std::size_t nano::bootstrap_ascending::account_sets::blocked_size () const +{ + return blocking.size (); +} + +float nano::bootstrap_ascending::account_sets::priority (nano::account const & account) const +{ + if (blocked (account)) + { + return 0.0f; + } + auto existing = priorities.get ().find (account); + if (existing != priorities.get ().end ()) + { + return existing->priority; + } + return account_sets::priority_cutoff; +} + +auto nano::bootstrap_ascending::account_sets::info () const -> info_t +{ + return { blocking, priorities }; +} + +std::unique_ptr nano::bootstrap_ascending::account_sets::collect_container_info (const std::string & name) +{ + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "priorities", priorities.size (), sizeof (decltype (priorities)::value_type) })); + composite->add_component (std::make_unique (container_info{ "blocking", blocking.size (), sizeof (decltype (blocking)::value_type) })); + return composite; +} + +/* + * priority_entry + */ + +nano::bootstrap_ascending::account_sets::priority_entry::priority_entry (nano::account account_a, float priority_a) : + account{ account_a }, + priority{ priority_a } +{ + id = nano::bootstrap_ascending::generate_id (); +} + +/* + * bootstrap_ascending + */ + +nano::bootstrap_ascending::bootstrap_ascending (nano::node & node_a, nano::store & store_a, nano::block_processor & block_processor_a, nano::ledger & ledger_a, nano::network & network_a, nano::stats & stat_a) : + node{ node_a }, + store{ store_a }, + block_processor{ block_processor_a }, + ledger{ ledger_a }, + network{ network_a }, + stats{ stat_a }, + accounts{ stats }, + iterator{ store }, + limiter{ requests_limit, 1.0 }, + database_limiter{ database_requests_limit, 1.0 } +{ + // TODO: This is called from a very congested blockprocessor thread. Offload this work to a dedicated processing thread + block_processor.batch_processed.add ([this] (auto const & batch) { + { + nano::lock_guard lock{ mutex }; + + auto transaction = store.tx_begin_read (); + for (auto const & [result, block] : batch) + { + debug_assert (block != nullptr); + + inspect (transaction, result, *block); + } + } + + condition.notify_all (); + }); +} + +nano::bootstrap_ascending::~bootstrap_ascending () +{ + // All threads must be stopped before destruction + debug_assert (!thread.joinable ()); + debug_assert (!timeout_thread.joinable ()); +} + +void nano::bootstrap_ascending::start () +{ + debug_assert (!thread.joinable ()); + debug_assert (!timeout_thread.joinable ()); + + thread = std::thread ([this] () { + nano::thread_role::set (nano::thread_role::name::ascending_bootstrap); + run (); + }); + + timeout_thread = std::thread ([this] () { + nano::thread_role::set (nano::thread_role::name::ascending_bootstrap); + run_timeouts (); + }); +} + +void nano::bootstrap_ascending::stop () +{ + nano::unique_lock lock{ mutex }; + stopped = true; + lock.unlock (); + condition.notify_all (); + nano::join_or_pass (thread); + nano::join_or_pass (timeout_thread); +} + +nano::bootstrap_ascending::id_t nano::bootstrap_ascending::generate_id () +{ + id_t id; + nano::random_pool::generate_block (reinterpret_cast (&id), sizeof (id)); + return id; +} + +void nano::bootstrap_ascending::send (std::shared_ptr channel, async_tag tag) +{ + debug_assert (tag.type == async_tag::query_type::blocks_by_hash || tag.type == async_tag::query_type::blocks_by_account); + + nano::asc_pull_req request{ node.network_params.network }; + request.id = tag.id; + request.type = nano::asc_pull_type::blocks; + + nano::asc_pull_req::blocks_payload request_payload; + request_payload.start = tag.start; + request_payload.count = pull_count; + request_payload.start_type = tag.type == async_tag::query_type::blocks_by_hash ? nano::asc_pull_req::hash_type::block : nano::asc_pull_req::hash_type::account; + + request.payload = request_payload; + request.update_header (); + + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::request, nano::stat::dir::out); + + // TODO: There is no feedback mechanism if bandwidth limiter starts dropping our requests + channel->send ( + request, nullptr, + nano::transport::buffer_drop_policy::limiter, nano::bandwidth_limit_type::bootstrap); +} + +size_t nano::bootstrap_ascending::priority_size () const +{ + nano::lock_guard lock{ mutex }; + return accounts.priority_size (); +} + +size_t nano::bootstrap_ascending::blocked_size () const +{ + nano::lock_guard lock{ mutex }; + return accounts.blocked_size (); +} + +/** Inspects a block that has been processed by the block processor +- Marks an account as blocked if the result code is gap source as there is no reason request additional blocks for this account until the dependency is resolved +- Marks an account as forwarded if it has been recently referenced by a block that has been inserted. + */ +void nano::bootstrap_ascending::inspect (nano::transaction const & tx, nano::process_return const & result, nano::block const & block) +{ + auto const hash = block.hash (); + + switch (result.code) + { + case nano::process_result::progress: + { + const auto account = ledger.account (tx, hash); + const auto is_send = ledger.is_send (tx, block); + + // If we've inserted any block in to an account, unmark it as blocked + accounts.unblock (account); + accounts.priority_up (account); + accounts.timestamp (account, /* reset timestamp */ true); + + if (is_send) + { + // TODO: Encapsulate this as a helper somewhere + nano::account destination{ 0 }; + switch (block.type ()) + { + case nano::block_type::send: + destination = block.destination (); + break; + case nano::block_type::state: + destination = block.link ().as_account (); + break; + default: + debug_assert (false, "unexpected block type"); + break; + } + if (!destination.is_zero ()) + { + accounts.unblock (destination, hash); // Unblocking automatically inserts account into priority set + accounts.priority_up (destination); + } + } + } + break; + case nano::process_result::gap_source: + { + const auto account = block.previous ().is_zero () ? block.account () : ledger.account (tx, block.previous ()); + const auto source = block.source ().is_zero () ? block.link ().as_block_hash () : block.source (); + + // Mark account as blocked because it is missing the source block + accounts.block (account, source); + + // TODO: Track stats + } + break; + case nano::process_result::old: + { + // TODO: Track stats + } + break; + case nano::process_result::gap_previous: + { + // TODO: Track stats + } + break; + default: // No need to handle other cases + break; + } +} + +void nano::bootstrap_ascending::wait_blockprocessor () +{ + while (!stopped && block_processor.half_full ()) + { + std::this_thread::sleep_for (500ms); // Blockprocessor is relatively slow, sleeping here instead of using conditions + } +} + +void nano::bootstrap_ascending::wait_available_request () +{ + while (!stopped && !limiter.should_pass (1)) + { + std::this_thread::sleep_for (50ms); // Give it at least some time to cooldown to avoid hitting the limit too frequently + } +} + +std::shared_ptr nano::bootstrap_ascending::available_channel () +{ + auto channels = network.random_set (32, node.network_params.network.bootstrap_protocol_version_min, /* include temporary channels */ true); + for (auto & channel : channels) + { + if (!channel->max ()) + { + return channel; + } + } + return nullptr; +} + +std::shared_ptr nano::bootstrap_ascending::wait_available_channel () +{ + std::shared_ptr channel; + while (!stopped && !(channel = available_channel ())) + { + std::this_thread::sleep_for (100ms); + } + return channel; +} + +nano::account nano::bootstrap_ascending::available_account () +{ + { + auto account = accounts.next (); + if (!account.is_zero ()) + { + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_priority); + return account; + } + } + + if (database_limiter.should_pass (1)) + { + auto account = iterator.next (); + if (!account.is_zero ()) + { + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_database); + return account; + } + } + + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_none); + return { 0 }; +} + +nano::account nano::bootstrap_ascending::wait_available_account () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + auto account = available_account (); + if (!account.is_zero ()) + { + accounts.timestamp (account); + return account; + } + else + { + condition.wait_for (lock, 100ms); + } + } + return { 0 }; +} + +bool nano::bootstrap_ascending::request (nano::account & account, std::shared_ptr & channel) +{ + async_tag tag{}; + tag.id = generate_id (); + tag.account = account; + tag.time = nano::milliseconds_since_epoch (); + + // Check if the account picked has blocks, if it does, start the pull from the highest block + auto info = store.account.get (store.tx_begin_read (), account); + if (info) + { + tag.type = async_tag::query_type::blocks_by_hash; + tag.start = info->head; + } + else + { + tag.type = async_tag::query_type::blocks_by_account; + tag.start = account; + } + + on_request.notify (tag, channel); + + track (tag); + send (channel, tag); + + return true; // Request sent +} + +bool nano::bootstrap_ascending::run_one () +{ + // Ensure there is enough space in blockprocessor for queuing new blocks + wait_blockprocessor (); + + // Do not do too many requests in parallel, impose throttling + wait_available_request (); + + // Waits for channel that is not full + auto channel = wait_available_channel (); + if (!channel) + { + return false; + } + + // Waits for account either from priority queue or database + auto account = wait_available_account (); + if (account.is_zero ()) + { + return false; + } + + bool success = request (account, channel); + return success; +} + +void nano::bootstrap_ascending::run () +{ + while (!stopped) + { + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop); + run_one (); + } +} + +void nano::bootstrap_ascending::run_timeouts () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + auto & tags_by_order = tags.get (); + while (!tags_by_order.empty () && nano::time_difference (tags_by_order.front ().time, nano::milliseconds_since_epoch ()) > timeout) + { + auto tag = tags_by_order.front (); + tags_by_order.pop_front (); + on_timeout.notify (tag); + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::timeout); + } + condition.wait_for (lock, 1s, [this] () { return stopped; }); + } +} + +void nano::bootstrap_ascending::process (const nano::asc_pull_ack & message) +{ + nano::unique_lock lock{ mutex }; + + // Only process messages that have a known tag + auto & tags_by_id = tags.get (); + if (tags_by_id.count (message.id) > 0) + { + auto iterator = tags_by_id.find (message.id); + auto tag = *iterator; + tags_by_id.erase (iterator); + + lock.unlock (); + + on_reply.notify (tag); + condition.notify_all (); + std::visit ([this, &tag] (auto && request) { return process (request, tag); }, message.payload); + } + else + { + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::missing_tag); + } +} + +void nano::bootstrap_ascending::process (const nano::asc_pull_ack::blocks_payload & response, const nano::bootstrap_ascending::async_tag & tag) +{ + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::reply); + + auto result = verify (response, tag); + switch (result) + { + case verify_result::ok: + { + stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::blocks, nano::stat::dir::in, response.blocks.size ()); + + for (auto & block : response.blocks) + { + block_processor.add (block); + } + } + break; + case verify_result::nothing_new: + { + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::nothing_new); + + { + nano::lock_guard lock{ mutex }; + accounts.priority_down (tag.account); + } + } + break; + case verify_result::invalid: + { + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::invalid); + // TODO: Log + } + break; + } +} + +void nano::bootstrap_ascending::process (const nano::asc_pull_ack::account_info_payload & response, const nano::bootstrap_ascending::async_tag & tag) +{ + // TODO: Make use of account info +} + +void nano::bootstrap_ascending::process (const nano::empty_payload & response, const nano::bootstrap_ascending::async_tag & tag) +{ + // Should not happen + debug_assert (false, "empty payload"); +} + +nano::bootstrap_ascending::verify_result nano::bootstrap_ascending::verify (const nano::asc_pull_ack::blocks_payload & response, const nano::bootstrap_ascending::async_tag & tag) const +{ + auto const & blocks = response.blocks; + + if (blocks.empty ()) + { + return verify_result::nothing_new; + } + if (blocks.size () == 1 && blocks.front ()->hash () == tag.start.as_block_hash ()) + { + return verify_result::nothing_new; + } + + auto const & first = blocks.front (); + switch (tag.type) + { + case async_tag::query_type::blocks_by_hash: + { + if (first->hash () != tag.start.as_block_hash ()) + { + // TODO: Stat & log + return verify_result::invalid; + } + } + break; + case async_tag::query_type::blocks_by_account: + { + // Open & state blocks always contain account field + if (first->account () != tag.start.as_account ()) + { + // TODO: Stat & log + return verify_result::invalid; + } + } + break; + default: + return verify_result::invalid; + } + + // Verify blocks make a valid chain + nano::block_hash previous_hash = blocks.front ()->hash (); + for (int n = 1; n < blocks.size (); ++n) + { + auto & block = blocks[n]; + if (block->previous () != previous_hash) + { + // TODO: Stat & log + return verify_result::invalid; // Blocks do not make a chain + } + previous_hash = block->hash (); + } + + return verify_result::ok; +} + +void nano::bootstrap_ascending::track (async_tag const & tag) +{ + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::track); + + nano::lock_guard lock{ mutex }; + debug_assert (tags.get ().count (tag.id) == 0); + tags.get ().insert (tag); +} + +auto nano::bootstrap_ascending::info () const -> account_sets::info_t +{ + nano::lock_guard lock{ mutex }; + return accounts.info (); +} + +std::unique_ptr nano::bootstrap_ascending::collect_container_info (std::string const & name) +{ + nano::lock_guard lock{ mutex }; + + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "tags", tags.size (), sizeof (decltype (tags)::value_type) })); + composite->add_component (accounts.collect_container_info ("accounts")); + return composite; +} diff --git a/nano/node/bootstrap/bootstrap_ascending.hpp b/nano/node/bootstrap/bootstrap_ascending.hpp new file mode 100644 index 00000000..83adcc6d --- /dev/null +++ b/nano/node/bootstrap/bootstrap_ascending.hpp @@ -0,0 +1,329 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace mi = boost::multi_index; + +namespace nano +{ +class block_processor; +class ledger; +class network; + +namespace transport +{ + class channel; +} + +class bootstrap_ascending +{ + using id_t = uint64_t; + +public: + bootstrap_ascending (nano::node &, nano::store &, nano::block_processor &, nano::ledger &, nano::network &, nano::stats &); + ~bootstrap_ascending (); + + void start (); + void stop (); + + /** + * Process `asc_pull_ack` message coming from network + */ + void process (nano::asc_pull_ack const & message); + +public: // Container info + std::unique_ptr collect_container_info (std::string const & name); + size_t blocked_size () const; + size_t priority_size () const; + +private: // Dependencies + nano::node & node; + nano::store & store; + nano::block_processor & block_processor; + nano::ledger & ledger; + nano::network & network; + nano::stats & stats; + +public: // async_tag + struct async_tag + { + enum class query_type + { + invalid = 0, // Default initialization + blocks_by_hash, + blocks_by_account, + // TODO: account_info, + }; + + query_type type{ query_type::invalid }; + id_t id{ 0 }; + nano::hash_or_account start{ 0 }; + nano::millis_t time{ 0 }; + nano::account account{ 0 }; + }; + +public: // Events + nano::observer_set &> on_request; + nano::observer_set on_reply; + nano::observer_set on_timeout; + +private: + /* Inspects a block that has been processed by the block processor */ + void inspect (nano::transaction const &, nano::process_return const & result, nano::block const & block); + + void run (); + bool run_one (); + void run_timeouts (); + + /* Limits the number of requests per second we make */ + void wait_available_request (); + /* Throttles requesting new blocks, not to overwhelm blockprocessor */ + void wait_blockprocessor (); + /* Waits for channel with free capacity for bootstrap messages */ + std::shared_ptr wait_available_channel (); + std::shared_ptr available_channel (); + /* Waits until a suitable account outside of cool down period is available */ + nano::account available_account (); + nano::account wait_available_account (); + + bool request (nano::account &, std::shared_ptr &); + void send (std::shared_ptr, async_tag tag); + void track (async_tag const & tag); + + void process (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag); + void process (nano::asc_pull_ack::account_info_payload const & response, async_tag const & tag); + void process (nano::empty_payload const & response, async_tag const & tag); + + enum class verify_result + { + ok, + nothing_new, + invalid, + }; + + /** + * Verifies whether the received response is valid. Returns: + * - invalid: when received blocks do not correspond to requested hash/account or they do not make a valid chain + * - nothing_new: when received response indicates that the account chain does not have more blocks + * - ok: otherwise, if all checks pass + */ + verify_result verify (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag) const; + + static id_t generate_id (); + +public: // account_sets + /** This class tracks accounts various account sets which are shared among the multiple bootstrap threads */ + class account_sets + { + public: + explicit account_sets (nano::stats &); + + /** + * If an account is not blocked, increase its priority. + * If the account does not exist in priority set and is not blocked, inserts a new entry. + * Current implementation increases priority by 1.0f each increment + */ + void priority_up (nano::account const & account); + /** + * Decreases account priority + * Current implementation divides priority by 2.0f and saturates down to 1.0f. + */ + void priority_down (nano::account const & account); + void block (nano::account const & account, nano::block_hash const & dependency); + void unblock (nano::account const & account, std::optional const & hash = std::nullopt); + void timestamp (nano::account const & account, bool reset = false); + + nano::account next (); + + public: + bool blocked (nano::account const & account) const; + std::size_t priority_size () const; + std::size_t blocked_size () const; + /** + * Accounts in the ledger but not in priority list are assumed priority 1.0f + * Blocked accounts are assumed priority 0.0f + */ + float priority (nano::account const & account) const; + + public: // Container info + std::unique_ptr collect_container_info (std::string const & name); + + private: + void trim_overflow (); + bool check_timestamp (nano::account const & account) const; + + private: // Dependencies + nano::stats & stats; + + private: + struct priority_entry + { + nano::account account{ 0 }; + float priority{ 0 }; + nano::millis_t timestamp{ 0 }; + id_t id{ 0 }; // Uniformly distributed, used for random querying + + priority_entry (nano::account account, float priority); + }; + + struct blocking_entry + { + nano::account account{ 0 }; + nano::block_hash dependency{ 0 }; + priority_entry original_entry{ 0, 0 }; + + float priority () const + { + return original_entry.priority; + } + }; + + // clang-format off + class tag_account {}; + class tag_priority {}; + class tag_sequenced {}; + class tag_id {}; + + // Tracks the ongoing account priorities + // This only stores account priorities > 1.0f. + using ordered_priorities = boost::multi_index_container>, + mi::ordered_unique, + mi::member>, + mi::ordered_non_unique, + mi::member>, + mi::ordered_unique, + mi::member> + >>; + + // A blocked account is an account that has failed to insert a new block because the source block is not currently present in the ledger + // An account is unblocked once it has a block successfully inserted + using ordered_blocking = boost::multi_index_container>, + mi::ordered_unique, + mi::member>, + mi::ordered_non_unique, + mi::const_mem_fun> + >>; + // clang-format on + + ordered_priorities priorities; + ordered_blocking blocking; + + std::default_random_engine rng; + + private: // TODO: Move into config + static std::size_t constexpr consideration_count = 4; + static std::size_t constexpr priorities_max = 256 * 1024; + static std::size_t constexpr blocking_max = 256 * 1024; + static nano::millis_t constexpr cooldown = 3 * 1000; + + public: // Consts + static float constexpr priority_initial = 8.0f; + static float constexpr priority_increase = 2.0f; + static float constexpr priority_decrease = 0.5f; + static float constexpr priority_max = 32.0f; + static float constexpr priority_cutoff = 1.0f; + + public: + using info_t = std::tuple; // + info_t info () const; + }; + + account_sets::info_t info () const; + +private: // Database iterators + class database_iterator + { + public: + enum class table_type + { + account, + pending + }; + + explicit database_iterator (nano::store & store, table_type); + nano::account operator* () const; + void next (nano::transaction & tx); + + private: + nano::store & store; + nano::account current{ 0 }; + const table_type table; + }; + + class buffered_iterator + { + public: + explicit buffered_iterator (nano::store & store); + nano::account operator* () const; + nano::account next (); + + private: + void fill (); + + private: + nano::store & store; + std::deque buffer; + + database_iterator accounts_iterator; + database_iterator pending_iterator; + + static std::size_t constexpr size = 1024; + }; + +private: + account_sets accounts; + buffered_iterator iterator; + + // clang-format off + class tag_sequenced {}; + class tag_id {}; + class tag_account {}; + + using ordered_tags = boost::multi_index_container>, + mi::hashed_unique, + mi::member>, + mi::hashed_non_unique, + mi::member> + >>; + // clang-format on + ordered_tags tags; + + nano::bandwidth_limiter limiter; + // Requests for accounts from database have much lower hitrate and could introduce strain on the network + // A separate (lower) limiter ensures that we always reserve resources for querying accounts from priority queue + nano::bandwidth_limiter database_limiter; + + bool stopped{ false }; + mutable nano::mutex mutex; + mutable nano::condition_variable condition; + std::thread thread; + std::thread timeout_thread; + +private: // TODO: Move into config + static std::size_t constexpr requests_limit{ 1024 * 4 }; + static std::size_t constexpr database_requests_limit{ 1024 }; + static std::size_t constexpr pull_count{ nano::bootstrap_server::max_blocks }; + static nano::millis_t constexpr timeout{ 1000 * 3 }; +}; +} diff --git a/nano/node/bootstrap/bootstrap_attempt.cpp b/nano/node/bootstrap/bootstrap_attempt.cpp index 58990698..de9ca3e6 100644 --- a/nano/node/bootstrap/bootstrap_attempt.cpp +++ b/nano/node/bootstrap/bootstrap_attempt.cpp @@ -99,6 +99,8 @@ char const * nano::bootstrap_attempt::mode_text () return "lazy"; case nano::bootstrap_mode::wallet_lazy: return "wallet_lazy"; + case nano::bootstrap_mode::ascending: + return "ascending"; } return "unknown"; } From 1d1d1526969d6917d842cb5da891398acbb2b1e6 Mon Sep 17 00:00:00 2001 From: Colin LeMahieu Date: Sun, 12 Mar 2023 21:57:48 +0000 Subject: [PATCH 3/5] Connecting ascending bootstrap client to node. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Piotr Wójcik <3044353+pwojcikdev@users.noreply.github.com> Co-authored-by: gr0vity-dev <85646666+gr0vity-dev@users.noreply.github.com> --- nano/node/bootstrap/bootstrap_bulk_pull.cpp | 2 +- nano/node/cli.cpp | 2 + nano/node/json_handler.cpp | 36 ++++ nano/node/json_handler.hpp | 1 + nano/node/network.cpp | 3 +- nano/node/network.hpp | 2 +- nano/node/node.cpp | 11 ++ nano/node/node.hpp | 2 + nano/node/nodeconfig.hpp | 5 +- nano/node/transport/tcp.cpp | 5 + nano/rpc/rpc_handler.cpp | 1 + nano/slow_test/CMakeLists.txt | 3 +- nano/slow_test/bootstrap.cpp | 200 ++++++++++++++++++++ 13 files changed, 267 insertions(+), 6 deletions(-) create mode 100644 nano/slow_test/bootstrap.cpp diff --git a/nano/node/bootstrap/bootstrap_bulk_pull.cpp b/nano/node/bootstrap/bootstrap_bulk_pull.cpp index ffcd38e8..6f3ff097 100644 --- a/nano/node/bootstrap/bootstrap_bulk_pull.cpp +++ b/nano/node/bootstrap/bootstrap_bulk_pull.cpp @@ -348,7 +348,7 @@ void nano::bulk_pull_server::set_current_end () connection->node->logger.try_log (boost::str (boost::format ("Bulk pull request for block hash: %1%") % request->start.to_string ())); } - current = request->start.as_block_hash (); + current = ascending () ? connection->node->store.block.successor (transaction, request->start.as_block_hash ()) : request->start.as_block_hash (); include_start = true; } else diff --git a/nano/node/cli.cpp b/nano/node/cli.cpp index db25e74f..5dd2c39f 100644 --- a/nano/node/cli.cpp +++ b/nano/node/cli.cpp @@ -94,6 +94,7 @@ void nano::add_node_flag_options (boost::program_options::options_description & ("disable_legacy_bootstrap", "Disables legacy bootstrap") ("disable_wallet_bootstrap", "Disables wallet lazy bootstrap") ("disable_ongoing_bootstrap", "Disable ongoing bootstrap") + ("disable_ascending_bootstrap", "Disable ascending bootstrap") ("disable_rep_crawler", "Disable rep crawler") ("disable_request_loop", "Disable request loop") ("disable_bootstrap_listener", "Disables bootstrap processing for TCP listener (not including realtime network TCP connections)") @@ -122,6 +123,7 @@ std::error_code nano::update_flags (nano::node_flags & flags_a, boost::program_o flags_a.disable_legacy_bootstrap = (vm.count ("disable_legacy_bootstrap") > 0); flags_a.disable_wallet_bootstrap = (vm.count ("disable_wallet_bootstrap") > 0); flags_a.disable_ongoing_bootstrap = (vm.count ("disable_ongoing_bootstrap") > 0); + flags_a.disable_ascending_bootstrap = (vm.count ("disable_ascending_bootstrap") > 0); flags_a.disable_rep_crawler = (vm.count ("disable_rep_crawler") > 0); flags_a.disable_request_loop = (vm.count ("disable_request_loop") > 0); if (!flags_a.inactive_node) diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index fa6ac917..b2e027c0 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -5220,6 +5221,40 @@ void nano::json_handler::populate_backlog () response_errors (); } +void nano::json_handler::debug_bootstrap_priority_info () +{ + if (!ec) + { + auto [blocking, priorities] = node.ascendboot.info (); + + // priorities + { + boost::property_tree::ptree response_priorities; + for (auto const & entry : priorities) + { + const auto account = entry.account; + const auto priority = entry.priority; + + response_priorities.put (account.to_account (), priority); + } + response_l.add_child ("priorities", response_priorities); + } + // blocking + { + boost::property_tree::ptree response_blocking; + for (auto const & entry : blocking) + { + const auto account = entry.account; + const auto dependency = entry.dependency; + + response_blocking.put (account.to_account (), dependency.to_string ()); + } + response_l.add_child ("blocking", response_blocking); + } + } + response_errors (); +} + void nano::inprocess_rpc_handler::process_request (std::string const &, std::string const & body_a, std::function response_a) { // Note that if the rpc action is async, the shared_ptr lifetime will be extended by the action handler @@ -5385,6 +5420,7 @@ ipc_json_handler_no_arg_func_map create_ipc_json_handler_no_arg_func_map () no_arg_funcs.emplace ("work_peers", &nano::json_handler::work_peers); no_arg_funcs.emplace ("work_peers_clear", &nano::json_handler::work_peers_clear); no_arg_funcs.emplace ("populate_backlog", &nano::json_handler::populate_backlog); + no_arg_funcs.emplace ("debug_bootstrap_priority_info", &nano::json_handler::debug_bootstrap_priority_info); return no_arg_funcs; } diff --git a/nano/node/json_handler.hpp b/nano/node/json_handler.hpp index 4d2a055d..60a257e3 100644 --- a/nano/node/json_handler.hpp +++ b/nano/node/json_handler.hpp @@ -65,6 +65,7 @@ public: void confirmation_info (); void confirmation_quorum (); void confirmation_height_currently_processing (); + void debug_bootstrap_priority_info (); void database_txn_tracker (); void delegators (); void delegators_count (); diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 9e3e7a5f..336e9ac1 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -511,7 +512,7 @@ public: void asc_pull_ack (nano::asc_pull_ack const & message) override { - // TODO: Process in ascending bootstrap client + node.ascendboot.process (message); } private: diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 35940208..0253496e 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -111,7 +111,7 @@ public: void random_fill (std::array &) const; void fill_keepalive_self (std::array &) const; // Note: The minimum protocol version is used after the random selection, so number of peers can be less than expected. - std::unordered_set> random_set (std::size_t, uint8_t = 0, bool = false) const; + std::unordered_set> random_set (std::size_t count, uint8_t min_version = 0, bool include_temporary_channels = false) const; // Get the next peer for attempting a tcp bootstrap connection nano::tcp_endpoint bootstrap_peer (); nano::endpoint endpoint () const; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index e20a4eb3..e233eddc 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -201,6 +201,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co aggregator (config, stats, generator, final_generator, history, ledger, wallets, active), wallets (wallets_store.init_error (), *this), backlog{ nano::backlog_population_config (config), store, stats }, + ascendboot{ *this, store, block_processor, ledger, network, stats }, websocket{ config.websocket_config, observers, wallets, ledger, io_ctx, logger }, epoch_upgrader{ *this, ledger, store, network_params, logger }, startup_time (std::chrono::steady_clock::now ()), @@ -584,6 +585,8 @@ std::unique_ptr nano::collect_container_info (no composite->add_component (node.inactive_vote_cache.collect_container_info ("inactive_vote_cache")); composite->add_component (collect_container_info (node.generator, "vote_generator")); composite->add_component (collect_container_info (node.final_generator, "vote_generator_final")); + composite->add_component (node.ascendboot.collect_container_info ("bootstrap_ascending")); + composite->add_component (node.unchecked.collect_container_info ("unchecked")); return composite; } @@ -695,6 +698,10 @@ void nano::node::start () backlog.start (); hinting.start (); bootstrap_server.start (); + if (!flags.disable_ascending_bootstrap) + { + ascendboot.start (); + } websocket.start (); telemetry.start (); } @@ -713,6 +720,10 @@ void nano::node::stop () // No tasks may wait for work generation in I/O threads, or termination signal capturing will be unable to call node::stop() distributed_work.stop (); backlog.stop (); + if (!flags.disable_ascending_bootstrap) + { + ascendboot.stop (); + } unchecked.stop (); block_processor.stop (); aggregator.stop (); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 987dce2e..37a0669f 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -192,6 +193,7 @@ public: nano::request_aggregator aggregator; nano::wallets wallets; nano::backlog_population backlog; + nano::bootstrap_ascending ascendboot; nano::websocket_server websocket; nano::epoch_upgrader epoch_upgrader; nano::block_broadcast block_broadcast; diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 9b59a439..56bd310b 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -101,8 +101,8 @@ public: std::size_t bandwidth_limit{ 10 * 1024 * 1024 }; /** By default, allow bursts of 15MB/s (not sustainable) */ double bandwidth_limit_burst_ratio{ 3. }; - /** Default boostrap outbound traffic limit is 16MB/s ~ 128Mbit/s */ - std::size_t bootstrap_bandwidth_limit{ 16 * 1024 * 1024 }; + /** Default boostrap outbound traffic limit is 5MB/s */ + std::size_t bootstrap_bandwidth_limit{ 5 * 1024 * 1024 }; /** Bootstrap traffic does not need bursts */ double bootstrap_bandwidth_burst_ratio{ 1. }; std::chrono::milliseconds conf_height_processor_batch_min_time{ 50 }; @@ -140,6 +140,7 @@ public: bool disable_bootstrap_bulk_pull_server{ false }; bool disable_bootstrap_bulk_push_client{ false }; bool disable_ongoing_bootstrap{ false }; // For testing only + bool disable_ascending_bootstrap{ true }; bool disable_rep_crawler{ false }; bool disable_request_loop{ false }; // For testing only bool disable_tcp_realtime{ false }; diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 676bb0dd..db828cd5 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -189,6 +189,11 @@ std::unordered_set> nano::transport::t auto index (nano::random_pool::generate_word32 (0, static_cast (peers_size - 1))); auto channel = channels.get ()[index].channel; + if (!channel->alive ()) + { + continue; + } + if (channel->get_network_version () >= min_version && (include_temporary_channels_a || !channel->temporary)) { result.insert (channel); diff --git a/nano/rpc/rpc_handler.cpp b/nano/rpc/rpc_handler.cpp index 69e95d00..7148d45d 100644 --- a/nano/rpc/rpc_handler.cpp +++ b/nano/rpc/rpc_handler.cpp @@ -153,6 +153,7 @@ std::unordered_set create_rpc_control_impls () set.emplace ("account_remove"); set.emplace ("account_representative_set"); set.emplace ("accounts_create"); + set.emplace ("backoff_info"); set.emplace ("block_create"); set.emplace ("bootstrap_lazy"); set.emplace ("confirmation_height_currently_processing"); diff --git a/nano/slow_test/CMakeLists.txt b/nano/slow_test/CMakeLists.txt index 564af6e0..c5bbedda 100644 --- a/nano/slow_test/CMakeLists.txt +++ b/nano/slow_test/CMakeLists.txt @@ -1,4 +1,5 @@ -add_executable(slow_test entry.cpp node.cpp vote_cache.cpp vote_processor.cpp) +add_executable(slow_test entry.cpp node.cpp vote_cache.cpp vote_processor.cpp + bootstrap.cpp) target_link_libraries(slow_test secure node test_common gtest libminiupnpc-static) diff --git a/nano/slow_test/bootstrap.cpp b/nano/slow_test/bootstrap.cpp new file mode 100644 index 00000000..63095e8d --- /dev/null +++ b/nano/slow_test/bootstrap.cpp @@ -0,0 +1,200 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include +#include + +using namespace std::chrono_literals; + +namespace +{ +void wait_for_key () +{ + int junk; + std::cin >> junk; +} + +class rpc_wrapper +{ +public: + rpc_wrapper (nano::test::system & system, nano::node & node, uint16_t port) : + node_rpc_config{}, + rpc_config{ node.network_params.network, port, true }, + ipc{ node, node_rpc_config }, + ipc_rpc_processor{ system.io_ctx, rpc_config }, + rpc{ system.io_ctx, rpc_config, ipc_rpc_processor } + { + } + + void start () + { + rpc.start (); + } + +public: + nano::node_rpc_config node_rpc_config; + nano::rpc_config rpc_config; + nano::ipc::ipc_server ipc; + nano::ipc_rpc_processor ipc_rpc_processor; + nano::rpc rpc; +}; + +std::unique_ptr start_rpc (nano::test::system & system, nano::node & node, uint16_t port) +{ + auto rpc = std::make_unique (system, node, port); + rpc->start (); + return rpc; +} +} + +TEST (bootstrap_ascending, profile) +{ + nano::test::system system; + nano::thread_runner runner{ system.io_ctx, 2 }; + nano::networks network = nano::networks::nano_beta_network; + nano::network_params network_params{ network }; + + // Set up client and server nodes + nano::node_config config_server{ network_params }; + config_server.preconfigured_peers.clear (); + config_server.bandwidth_limit = 0; // Unlimited server bandwidth + nano::node_flags flags_server; + flags_server.disable_legacy_bootstrap = true; + flags_server.disable_wallet_bootstrap = true; + flags_server.disable_add_initial_peers = true; + flags_server.disable_ongoing_bootstrap = true; + flags_server.disable_ascending_bootstrap = true; + auto data_path_server = nano::working_path (network); + //auto data_path_server = ""; + auto server = std::make_shared (system.io_ctx, data_path_server, config_server, system.work, flags_server); + system.nodes.push_back (server); + server->start (); + + nano::node_config config_client{ network_params }; + config_client.preconfigured_peers.clear (); + config_client.bandwidth_limit = 0; // Unlimited server bandwidth + nano::node_flags flags_client; + flags_client.disable_legacy_bootstrap = true; + flags_client.disable_wallet_bootstrap = true; + flags_client.disable_add_initial_peers = true; + flags_client.disable_ongoing_bootstrap = true; + config_client.ipc_config.transport_tcp.enabled = true; + // Disable database integrity safety for higher throughput + config_client.lmdb_config.sync = nano::lmdb_config::sync_strategy::nosync_unsafe; + //auto client = system.add_node (config_client, flags_client); + + // macos 16GB RAM disk: diskutil erasevolume HFS+ "RAMDisk" `hdiutil attach -nomount ram://33554432` + //auto data_path_client = "/Volumes/RAMDisk"; + auto data_path_client = nano::unique_path (); + auto client = std::make_shared (system.io_ctx, data_path_client, config_client, system.work, flags_client); + system.nodes.push_back (client); + client->start (); + + // Set up RPC + auto client_rpc = start_rpc (system, *server, 55000); + auto server_rpc = start_rpc (system, *client, 55001); + + struct entry + { + nano::bootstrap_ascending::async_tag tag; + std::shared_ptr request_channel; + std::shared_ptr reply_channel; + + bool replied{ false }; + bool received{ false }; + }; + + nano::mutex mutex; + std::unordered_map requests; + + server->bootstrap_server.on_response.add ([&] (auto & response, auto & channel) { + nano::lock_guard lock{ mutex }; + + if (requests.count (response.id)) + { + requests[response.id].replied = true; + requests[response.id].reply_channel = channel; + } + else + { + std::cerr << "unknown response: " << response.id << std::endl; + } + }); + + client->ascendboot.on_request.add ([&] (auto & tag, auto & channel) { + nano::lock_guard lock{ mutex }; + + requests[tag.id] = { tag, channel }; + }); + + client->ascendboot.on_reply.add ([&] (auto & tag) { + nano::lock_guard lock{ mutex }; + + requests[tag.id].received = true; + }); + + /*client->ascendboot.on_timeout.add ([&] (auto & tag) { + nano::lock_guard lock{ mutex }; + + if (requests.count (tag.id)) + { + auto entry = requests[tag.id]; + + std::cerr << "timeout: " + << "replied: " << entry.replied + << " | " + << "recevied: " << entry.received + << " | " + << "request: " << entry.request_channel->to_string () + << " ||| " + << "reply: " << (entry.reply_channel ? entry.reply_channel->to_string () : "null") + << std::endl; + } + else + { + std::cerr << "unknown timeout: " << tag.id << std::endl; + } + });*/ + + std::cout << "server count: " << server->ledger.cache.block_count << std::endl; + + nano::test::rate_observer rate; + rate.observe ("count", [&] () { return client->ledger.cache.block_count.load (); }); + rate.observe ("unchecked", [&] () { return client->unchecked.count (); }); + rate.observe ("block_processor", [&] () { return client->block_processor.size (); }); + rate.observe ("priority", [&] () { return client->ascendboot.priority_size (); }); + rate.observe ("blocking", [&] () { return client->ascendboot.blocked_size (); }); + rate.observe (*client, nano::stat::type::bootstrap_ascending, nano::stat::detail::request, nano::stat::dir::out); + rate.observe (*client, nano::stat::type::bootstrap_ascending, nano::stat::detail::reply, nano::stat::dir::in); + rate.observe (*client, nano::stat::type::bootstrap_ascending, nano::stat::detail::blocks, nano::stat::dir::in); + rate.observe (*server, nano::stat::type::bootstrap_server, nano::stat::detail::blocks, nano::stat::dir::out); + rate.observe (*client, nano::stat::type::ledger, nano::stat::detail::old, nano::stat::dir::in); + rate.observe (*client, nano::stat::type::ledger, nano::stat::detail::gap_epoch_open_pending, nano::stat::dir::in); + rate.observe (*client, nano::stat::type::ledger, nano::stat::detail::gap_source, nano::stat::dir::in); + rate.observe (*client, nano::stat::type::ledger, nano::stat::detail::gap_previous, nano::stat::dir::in); + rate.background_print (3s); + + //wait_for_key (); + while (true) + { + nano::test::establish_tcp (system, *client, server->network.endpoint ()); + std::this_thread::sleep_for (10s); + } + + server->stop (); + client->stop (); +} From 87f61d83821f324eddcde1ea906a6fc287991a1b Mon Sep 17 00:00:00 2001 From: Colin LeMahieu Date: Sun, 5 Mar 2023 15:19:47 +0000 Subject: [PATCH 4/5] Enabling ascending bootstrap. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Piotr Wójcik <3044353+pwojcikdev@users.noreply.github.com> Co-authored-by: gr0vity-dev <85646666+gr0vity-dev@users.noreply.github.com> --- nano/node/bootstrap/bootstrap_connections.cpp | 6 +++--- nano/node/nodeconfig.hpp | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/nano/node/bootstrap/bootstrap_connections.cpp b/nano/node/bootstrap/bootstrap_connections.cpp index d87d4bd9..ce901561 100644 --- a/nano/node/bootstrap/bootstrap_connections.cpp +++ b/nano/node/bootstrap/bootstrap_connections.cpp @@ -172,8 +172,8 @@ void nano::bootstrap_connections::connect_client (nano::tcp_endpoint const & end case boost::system::errc::connection_refused: case boost::system::errc::operation_canceled: case boost::system::errc::timed_out: - case 995: //Windows The I/O operation has been aborted because of either a thread exit or an application request - case 10061: //Windows No connection could be made because the target machine actively refused it + case 995: // Windows The I/O operation has been aborted because of either a thread exit or an application request + case 10061: // Windows No connection could be made because the target machine actively refused it break; } } @@ -286,7 +286,7 @@ void nano::bootstrap_connections::populate_connections (bool repeat) // Not many peers respond, need to try to make more connections than we need. for (auto i = 0u; i < delta; i++) { - auto endpoint (node.network.bootstrap_peer ()); + auto endpoint (node.network.bootstrap_peer ()); // Legacy bootstrap is compatible with older version of protocol if (endpoint != nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0) && (node.flags.allow_bootstrap_peers_duplicates || endpoints.find (endpoint) == endpoints.end ()) && !node.network.excluded_peers.check (endpoint)) { connect_client (endpoint); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 56bd310b..83826a21 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -140,7 +140,7 @@ public: bool disable_bootstrap_bulk_pull_server{ false }; bool disable_bootstrap_bulk_push_client{ false }; bool disable_ongoing_bootstrap{ false }; // For testing only - bool disable_ascending_bootstrap{ true }; + bool disable_ascending_bootstrap{ false }; bool disable_rep_crawler{ false }; bool disable_request_loop{ false }; // For testing only bool disable_tcp_realtime{ false }; From 2807ceecb4a1bc19afcb29f96da392998227538e Mon Sep 17 00:00:00 2001 From: Colin LeMahieu Date: Tue, 14 Mar 2023 14:14:19 +0000 Subject: [PATCH 5/5] Lowering requests_limit to 128 from 4096 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Piotr Wójcik <3044353+pwojcikdev@users.noreply.github.com> Co-authored-by: gr0vity-dev <85646666+gr0vity-dev@users.noreply.github.com> --- nano/node/bootstrap/bootstrap_ascending.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nano/node/bootstrap/bootstrap_ascending.hpp b/nano/node/bootstrap/bootstrap_ascending.hpp index 83adcc6d..62bac5b6 100644 --- a/nano/node/bootstrap/bootstrap_ascending.hpp +++ b/nano/node/bootstrap/bootstrap_ascending.hpp @@ -321,7 +321,7 @@ private: std::thread timeout_thread; private: // TODO: Move into config - static std::size_t constexpr requests_limit{ 1024 * 4 }; + static std::size_t constexpr requests_limit{ 128 }; static std::size_t constexpr database_requests_limit{ 1024 }; static std::size_t constexpr pull_count{ nano::bootstrap_server::max_blocks }; static nano::millis_t constexpr timeout{ 1000 * 3 };