diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index dcd6734d..7c891609 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -9,6 +9,7 @@ add_executable( block_store.cpp blockprocessor.cpp bootstrap.cpp + bootstrap_ascending.cpp bootstrap_server.cpp cli.cpp confirmation_height.cpp diff --git a/nano/core_test/block_store.cpp b/nano/core_test/block_store.cpp index c37a8e4a..52396d51 100644 --- a/nano/core_test/block_store.cpp +++ b/nano/core_test/block_store.cpp @@ -450,9 +450,8 @@ TEST (block_store, empty_bootstrap) auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); nano::unchecked_map unchecked{ *store, system.stats, false }; ASSERT_TRUE (!store->init_error ()); - auto transaction (store->tx_begin_read ()); size_t count = 0; - unchecked.for_each (transaction, [&count] (nano::unchecked_key const & key, nano::unchecked_info const & info) { + unchecked.for_each ([&count] (nano::unchecked_key const & key, nano::unchecked_info const & info) { ++count; }); ASSERT_EQ (count, 0); diff --git a/nano/core_test/bootstrap.cpp b/nano/core_test/bootstrap.cpp index 899fa4ed..f2a13dc9 100644 --- a/nano/core_test/bootstrap.cpp +++ b/nano/core_test/bootstrap.cpp @@ -127,12 +127,12 @@ TEST (bulk_pull, ascending_one_hash) auto connection = std::make_shared (socket, system.nodes[0]); auto req = std::make_unique (nano::dev::network_params.network); req->start = nano::dev::genesis->hash (); - req->end = nano::dev::genesis->hash (); + req->end.clear (); req->header.flag_set (nano::message_header::bulk_pull_ascending_flag); auto request = std::make_shared (connection, std::move (req)); auto block_out1 = request->get_next (); ASSERT_NE (nullptr, block_out1); - ASSERT_EQ (block_out1->hash (), nano::dev::genesis->hash ()); + ASSERT_EQ (block_out1->hash (), block1->hash ()); ASSERT_EQ (nullptr, request->get_next ()); } @@ -158,7 +158,7 @@ TEST (bulk_pull, ascending_two_account) auto socket = std::make_shared (node, nano::transport::socket::endpoint_type_t::server); auto connection = std::make_shared (socket, system.nodes[0]); auto req = std::make_unique (nano::dev::network_params.network); - req->start = nano::dev::genesis->hash (); + req->start = nano::dev::genesis->account (); req->end.clear (); req->header.flag_set (nano::message_header::bulk_pull_ascending_flag); auto request = std::make_shared (connection, std::move (req)); @@ -172,7 +172,7 @@ TEST (bulk_pull, ascending_two_account) } /** - Tests that the `end' value is respected in the bulk_pull message + Tests that the `end' value is respected in the bulk_pull message when the ascending flag is used. */ TEST (bulk_pull, ascending_end) { @@ -1351,7 +1351,7 @@ TEST (bootstrap_processor, lazy_pruning_missing_block) ASSERT_FALSE (node2->ledger.block_or_pruned_exists (state_open->hash ())); { auto transaction (node2->store.tx_begin_read ()); - ASSERT_TRUE (node2->unchecked.exists (transaction, nano::unchecked_key (send2->root ().as_block_hash (), send2->hash ()))); + ASSERT_TRUE (node2->unchecked.exists (nano::unchecked_key (send2->root ().as_block_hash (), send2->hash ()))); } // Insert missing block node2->process_active (send1); @@ -2045,7 +2045,7 @@ TEST (bulk, DISABLED_genesis_pruning) ASSERT_EQ (1, node2->ledger.cache.block_count); { auto transaction (node2->store.tx_begin_write ()); - node2->unchecked.clear (transaction); + node2->unchecked.clear (); } // Insert pruned blocks node2->process_active (send1); diff --git a/nano/core_test/bootstrap_ascending.cpp b/nano/core_test/bootstrap_ascending.cpp new file mode 100644 index 00000000..6eba2795 --- /dev/null +++ b/nano/core_test/bootstrap_ascending.cpp @@ -0,0 +1,252 @@ +#include +#include +#include +#include + +#include + +using namespace std::chrono_literals; + +namespace +{ +nano::block_hash random_hash () +{ + nano::block_hash random_hash; + nano::random_pool::generate_block (random_hash.bytes.data (), random_hash.bytes.size ()); + return random_hash; +} +} + +TEST (account_sets, construction) +{ + nano::stats stats; + nano::logger_mt logger; + auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); + ASSERT_FALSE (store->init_error ()); + nano::bootstrap_ascending::account_sets sets{ stats }; +} + +TEST (account_sets, empty_blocked) +{ + nano::account account{ 1 }; + nano::stats stats; + nano::logger_mt logger; + auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); + ASSERT_FALSE (store->init_error ()); + nano::bootstrap_ascending::account_sets sets{ stats }; + ASSERT_FALSE (sets.blocked (account)); +} + +TEST (account_sets, block) +{ + nano::account account{ 1 }; + nano::stats stats; + nano::logger_mt logger; + auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); + ASSERT_FALSE (store->init_error ()); + nano::bootstrap_ascending::account_sets sets{ stats }; + sets.block (account, random_hash ()); + ASSERT_TRUE (sets.blocked (account)); +} + +TEST (account_sets, unblock) +{ + nano::account account{ 1 }; + nano::stats stats; + nano::logger_mt logger; + auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); + ASSERT_FALSE (store->init_error ()); + nano::bootstrap_ascending::account_sets sets{ stats }; + auto hash = random_hash (); + sets.block (account, hash); + sets.unblock (account, hash); + ASSERT_FALSE (sets.blocked (account)); +} + +TEST (account_sets, priority_base) +{ + nano::account account{ 1 }; + nano::stats stats; + nano::logger_mt logger; + auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); + ASSERT_FALSE (store->init_error ()); + nano::bootstrap_ascending::account_sets sets{ stats }; + ASSERT_EQ (1.0f, sets.priority (account)); +} + +TEST (account_sets, priority_blocked) +{ + nano::account account{ 1 }; + nano::stats stats; + nano::logger_mt logger; + auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); + ASSERT_FALSE (store->init_error ()); + nano::bootstrap_ascending::account_sets sets{ stats }; + sets.block (account, random_hash ()); + ASSERT_EQ (0.0f, sets.priority (account)); +} + +// When account is unblocked, check that it retains it former priority +TEST (account_sets, priority_unblock_keep) +{ + nano::account account{ 1 }; + nano::stats stats; + nano::logger_mt logger; + auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); + ASSERT_FALSE (store->init_error ()); + nano::bootstrap_ascending::account_sets sets{ stats }; + sets.priority_up (account); + sets.priority_up (account); + ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial * nano::bootstrap_ascending::account_sets::priority_increase); + auto hash = random_hash (); + sets.block (account, hash); + ASSERT_EQ (0.0f, sets.priority (account)); + sets.unblock (account, hash); + ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial * nano::bootstrap_ascending::account_sets::priority_increase); +} + +TEST (account_sets, priority_up_down) +{ + nano::account account{ 1 }; + nano::stats stats; + nano::logger_mt logger; + auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); + ASSERT_FALSE (store->init_error ()); + nano::bootstrap_ascending::account_sets sets{ stats }; + sets.priority_up (account); + ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial); + sets.priority_down (account); + ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial - nano::bootstrap_ascending::account_sets::priority_decrease); +} + +// Check that priority downward saturates to 1.0f +TEST (account_sets, priority_down_sat) +{ + nano::account account{ 1 }; + nano::stats stats; + nano::logger_mt logger; + auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); + ASSERT_FALSE (store->init_error ()); + nano::bootstrap_ascending::account_sets sets{ stats }; + sets.priority_down (account); + ASSERT_EQ (1.0f, sets.priority (account)); +} + +// Ensure priority value is bounded +TEST (account_sets, saturate_priority) +{ + nano::account account{ 1 }; + nano::stats stats; + nano::logger_mt logger; + auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants); + ASSERT_FALSE (store->init_error ()); + nano::bootstrap_ascending::account_sets sets{ stats }; + for (int n = 0; n < 1000; ++n) + { + sets.priority_up (account); + } + ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_max); +} + +/** + * Tests the base case for returning + */ +TEST (bootstrap_ascending, account_base) +{ + nano::node_flags flags; + nano::test::system system{ 1, nano::transport::transport_type::tcp, flags }; + auto & node0 = *system.nodes[0]; + nano::state_block_builder builder; + auto send1 = builder.make_block () + .account (nano::dev::genesis_key.pub) + .previous (nano::dev::genesis->hash ()) + .representative (nano::dev::genesis_key.pub) + .link (0) + .balance (nano::dev::constants.genesis_amount - 1) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (nano::dev::genesis->hash ())) + .build_shared (); + ASSERT_EQ (nano::process_result::progress, node0.process (*send1).code); + auto & node1 = *system.add_node (flags); + ASSERT_TIMELY (5s, node1.block (send1->hash ()) != nullptr); +} + +/** + * Tests that bootstrap_ascending will return multiple new blocks in-order + */ +TEST (bootstrap_ascending, account_inductive) +{ + nano::node_flags flags; + nano::test::system system{ 1, nano::transport::transport_type::tcp, flags }; + auto & node0 = *system.nodes[0]; + nano::state_block_builder builder; + auto send1 = builder.make_block () + .account (nano::dev::genesis_key.pub) + .previous (nano::dev::genesis->hash ()) + .representative (nano::dev::genesis_key.pub) + .link (0) + .balance (nano::dev::constants.genesis_amount - 1) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (nano::dev::genesis->hash ())) + .build_shared (); + auto send2 = builder.make_block () + .account (nano::dev::genesis_key.pub) + .previous (send1->hash ()) + .representative (nano::dev::genesis_key.pub) + .link (0) + .balance (nano::dev::constants.genesis_amount - 2) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (send1->hash ())) + .build_shared (); + // std::cerr << "Genesis: " << nano::dev::genesis->hash ().to_string () << std::endl; + // std::cerr << "Send1: " << send1->hash ().to_string () << std::endl; + // std::cerr << "Send2: " << send2->hash ().to_string () << std::endl; + ASSERT_EQ (nano::process_result::progress, node0.process (*send1).code); + ASSERT_EQ (nano::process_result::progress, node0.process (*send2).code); + auto & node1 = *system.add_node (flags); + ASSERT_TIMELY (50s, node1.block (send2->hash ()) != nullptr); +} + +/** + * Tests that bootstrap_ascending will return multiple new blocks in-order + */ +TEST (bootstrap_ascending, trace_base) +{ + nano::node_flags flags; + flags.disable_legacy_bootstrap = true; + nano::test::system system{ 1, nano::transport::transport_type::tcp, flags }; + auto & node0 = *system.nodes[0]; + nano::keypair key; + nano::state_block_builder builder; + auto send1 = builder.make_block () + .account (nano::dev::genesis_key.pub) + .previous (nano::dev::genesis->hash ()) + .representative (nano::dev::genesis_key.pub) + .link (key.pub) + .balance (nano::dev::constants.genesis_amount - 1) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (nano::dev::genesis->hash ())) + .build_shared (); + auto receive1 = builder.make_block () + .account (key.pub) + .previous (0) + .representative (nano::dev::genesis_key.pub) + .link (send1->hash ()) + .balance (1) + .sign (key.prv, key.pub) + .work (*system.work.generate (key.pub)) + .build_shared (); + // std::cerr << "Genesis key: " << nano::dev::genesis_key.pub.to_account () << std::endl; + // std::cerr << "Key: " << key.pub.to_account () << std::endl; + // std::cerr << "Genesis: " << nano::dev::genesis->hash ().to_string () << std::endl; + // std::cerr << "send1: " << send1->hash ().to_string () << std::endl; + // std::cerr << "receive1: " << receive1->hash ().to_string () << std::endl; + auto & node1 = *system.add_node (); + // std::cerr << "--------------- Start ---------------\n"; + ASSERT_EQ (nano::process_result::progress, node0.process (*send1).code); + ASSERT_EQ (nano::process_result::progress, node0.process (*receive1).code); + ASSERT_EQ (node1.store.pending.begin (node1.store.tx_begin_read (), nano::pending_key{ key.pub, 0 }), node1.store.pending.end ()); + // std::cerr << "node0: " << node0.network.endpoint () << std::endl; + // std::cerr << "node1: " << node1.network.endpoint () << std::endl; + ASSERT_TIMELY (10s, node1.block (receive1->hash ()) != nullptr); +} diff --git a/nano/core_test/confirmation_height.cpp b/nano/core_test/confirmation_height.cpp index 124dbacc..ff2698d5 100644 --- a/nano/core_test/confirmation_height.cpp +++ b/nano/core_test/confirmation_height.cpp @@ -391,14 +391,14 @@ TEST (confirmation_height, gap_bootstrap) node1.process_active (receive2); // Waits for the unchecked_map to process the 4 blocks added to the block_processor, saving them in the unchecked table auto check_block_is_listed = [&] (nano::transaction const & transaction_a, nano::block_hash const & block_hash_a) { - return !node1.unchecked.get (transaction_a, block_hash_a).empty (); + return !node1.unchecked.get (block_hash_a).empty (); }; ASSERT_TIMELY (5s, check_block_is_listed (node1.store.tx_begin_read (), receive2->previous ())); // Confirmation heights should not be updated { auto transaction (node1.store.tx_begin_read ()); - auto unchecked_count (node1.unchecked.count (transaction)); + auto unchecked_count (node1.unchecked.count ()); ASSERT_EQ (unchecked_count, 2); nano::confirmation_height_info confirmation_height_info; @@ -410,7 +410,7 @@ TEST (confirmation_height, gap_bootstrap) // Now complete the chain where the block comes in on the bootstrap network. node1.block_processor.add (open1); - ASSERT_TIMELY (5s, node1.unchecked.count (node1.store.tx_begin_read ()) == 0); + ASSERT_TIMELY (5s, node1.unchecked.count () == 0); // Confirmation height should be unchanged and unchecked should now be 0 { auto transaction = node1.store.tx_begin_read (); @@ -544,7 +544,7 @@ TEST (confirmation_height, gap_live) // This should confirm the open block and the source of the receive blocks auto transaction = node->store.tx_begin_read (); - auto unchecked_count = node->unchecked.count (transaction); + auto unchecked_count = node->unchecked.count (); ASSERT_EQ (unchecked_count, 0); nano::confirmation_height_info confirmation_height_info{}; diff --git a/nano/core_test/ledger.cpp b/nano/core_test/ledger.cpp index 8efab6b9..666707d1 100644 --- a/nano/core_test/ledger.cpp +++ b/nano/core_test/ledger.cpp @@ -3999,10 +3999,10 @@ TEST (ledger, epoch_open_pending) ASSERT_EQ (nano::process_result::gap_epoch_open_pending, process_result.code); node1.block_processor.add (epoch_open); // Waits for the block to get saved in the database - ASSERT_TIMELY (10s, 1 == node1.unchecked.count (node1.store.tx_begin_read ())); + ASSERT_TIMELY (10s, 1 == node1.unchecked.count ()); ASSERT_FALSE (node1.ledger.block_or_pruned_exists (epoch_open->hash ())); // Open block should be inserted into unchecked - auto blocks = node1.unchecked.get (node1.store.tx_begin_read (), nano::hash_or_account (epoch_open->account ()).hash); + auto blocks = node1.unchecked.get (nano::hash_or_account (epoch_open->account ()).hash); ASSERT_EQ (blocks.size (), 1); ASSERT_EQ (blocks[0].block->full_hash (), epoch_open->full_hash ()); // New block to process epoch open @@ -4293,8 +4293,8 @@ TEST (ledger, unchecked_epoch) node1.block_processor.add (epoch1); { // Waits for the epoch1 block to pass through block_processor and unchecked.put queues - ASSERT_TIMELY (10s, 1 == node1.unchecked.count (node1.store.tx_begin_read ())); - auto blocks = node1.unchecked.get (node1.store.tx_begin_read (), epoch1->previous ()); + ASSERT_TIMELY (10s, 1 == node1.unchecked.count ()); + auto blocks = node1.unchecked.get (epoch1->previous ()); ASSERT_EQ (blocks.size (), 1); } node1.block_processor.add (send1); @@ -4302,7 +4302,7 @@ TEST (ledger, unchecked_epoch) ASSERT_TIMELY (5s, node1.store.block.exists (node1.store.tx_begin_read (), epoch1->hash ())); { // Waits for the last blocks to pass through block_processor and unchecked.put queues - ASSERT_TIMELY (10s, 0 == node1.unchecked.count (node1.store.tx_begin_read ())); + ASSERT_TIMELY (10s, 0 == node1.unchecked.count ()); auto info = node1.ledger.account_info (node1.store.tx_begin_read (), destination.pub); ASSERT_TRUE (info); ASSERT_EQ (info->epoch (), nano::epoch::epoch_1); @@ -4367,8 +4367,8 @@ TEST (ledger, unchecked_epoch_invalid) node1.block_processor.add (epoch2); { // Waits for the last blocks to pass through block_processor and unchecked.put queues - ASSERT_TIMELY (10s, 2 == node1.unchecked.count (node1.store.tx_begin_read ())); - auto blocks = node1.unchecked.get (node1.store.tx_begin_read (), epoch1->previous ()); + ASSERT_TIMELY (10s, 2 == node1.unchecked.count ()); + auto blocks = node1.unchecked.get (epoch1->previous ()); ASSERT_EQ (blocks.size (), 2); } node1.block_processor.add (send1); @@ -4378,9 +4378,9 @@ TEST (ledger, unchecked_epoch_invalid) { auto transaction = node1.store.tx_begin_read (); ASSERT_FALSE (node1.store.block.exists (transaction, epoch1->hash ())); - auto unchecked_count = node1.unchecked.count (transaction); + auto unchecked_count = node1.unchecked.count (); ASSERT_EQ (unchecked_count, 0); - ASSERT_EQ (unchecked_count, node1.unchecked.count (transaction)); + ASSERT_EQ (unchecked_count, node1.unchecked.count ()); auto info = node1.ledger.account_info (transaction, destination.pub); ASSERT_TRUE (info); ASSERT_NE (info->epoch (), nano::epoch::epoch_1); @@ -4434,15 +4434,15 @@ TEST (ledger, unchecked_open) node1.block_processor.add (open1); { // Waits for the last blocks to pass through block_processor and unchecked.put queues - ASSERT_TIMELY (5s, 1 == node1.unchecked.count (node1.store.tx_begin_read ())); - // When open1 existists in unchecked, we know open2 has been processed. - auto blocks = node1.unchecked.get (node1.store.tx_begin_read (), open1->source ()); + ASSERT_TIMELY (10s, 1 == node1.unchecked.count ()); + // Get the next peer for attempting a tcp bootstrap connection + auto blocks = node1.unchecked.get (open1->source ()); ASSERT_EQ (blocks.size (), 1); } node1.block_processor.add (send1); // Waits for the send1 block to pass through block_processor and unchecked.put queues ASSERT_TIMELY (5s, node1.store.block.exists (node1.store.tx_begin_read (), open1->hash ())); - ASSERT_EQ (0, node1.unchecked.count (node1.store.tx_begin_read ())); + ASSERT_EQ (0, node1.unchecked.count ()); } TEST (ledger, unchecked_receive) @@ -4493,13 +4493,13 @@ TEST (ledger, unchecked_receive) node1.block_processor.add (send1); node1.block_processor.add (receive1); auto check_block_is_listed = [&] (nano::transaction const & transaction_a, nano::block_hash const & block_hash_a) { - return !node1.unchecked.get (transaction_a, block_hash_a).empty (); + return !node1.unchecked.get (block_hash_a).empty (); }; // Previous block for receive1 is unknown, signature cannot be validated { // Waits for the last blocks to pass through block_processor and unchecked.put queues ASSERT_TIMELY (15s, check_block_is_listed (node1.store.tx_begin_read (), receive1->previous ())); - auto blocks = node1.unchecked.get (node1.store.tx_begin_read (), receive1->previous ()); + auto blocks = node1.unchecked.get (receive1->previous ()); ASSERT_EQ (blocks.size (), 1); } // Waits for the open1 block to pass through block_processor and unchecked.put queues @@ -4508,12 +4508,12 @@ TEST (ledger, unchecked_receive) // Previous block for receive1 is known, signature was validated { auto transaction = node1.store.tx_begin_read (); - auto blocks (node1.unchecked.get (transaction, receive1->source ())); + auto blocks (node1.unchecked.get (receive1->source ())); ASSERT_EQ (blocks.size (), 1); } node1.block_processor.add (send2); ASSERT_TIMELY (10s, node1.store.block.exists (node1.store.tx_begin_read (), receive1->hash ())); - ASSERT_EQ (0, node1.unchecked.count (node1.store.tx_begin_read ())); + ASSERT_EQ (0, node1.unchecked.count ()); } TEST (ledger, confirmation_height_not_updated) @@ -5542,8 +5542,7 @@ TEST (ledger, migrate_lmdb_to_rocksdb) uint16_t port = 100; nano::lmdb::store store{ logger, path / "data.ldb", nano::dev::constants }; nano::unchecked_map unchecked{ store, system.stats, false }; - nano::stats stats{}; - nano::ledger ledger{ store, stats, nano::dev::constants }; + nano::ledger ledger{ store, system.stats, nano::dev::constants }; nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits::max () }; std::shared_ptr send = nano::state_block_builder () diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 7b541beb..d7d11e1c 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -2984,7 +2984,7 @@ TEST (node, block_processor_signatures) node1.process_active (receive2); node1.process_active (receive3); ASSERT_TIMELY (5s, node1.block (receive2->hash ()) != nullptr); // Implies send1, send2, send3, receive1. - ASSERT_TIMELY (5s, node1.unchecked.count (node1.store.tx_begin_read ()) == 0); + ASSERT_TIMELY (5s, node1.unchecked.count () == 0); ASSERT_EQ (nullptr, node1.block (receive3->hash ())); // Invalid signer ASSERT_EQ (nullptr, node1.block (send4->hash ())); // Invalid signature via process_active ASSERT_EQ (nullptr, node1.block (send5->hash ())); // Invalid signature via unchecked @@ -3289,17 +3289,17 @@ TEST (node, unchecked_cleanup) ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ())); node.process_active (open); // Waits for the open block to get saved in the database - ASSERT_TIMELY (15s, 1 == node.unchecked.count (node.store.tx_begin_read ())); + ASSERT_TIMELY (15s, 1 == node.unchecked.count ()); node.config.unchecked_cutoff_time = std::chrono::seconds (2); - ASSERT_EQ (1, node.unchecked.count (node.store.tx_begin_read ())); + ASSERT_EQ (1, node.unchecked.count ()); std::this_thread::sleep_for (std::chrono::seconds (1)); node.unchecked_cleanup (); ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ())); - ASSERT_EQ (1, node.unchecked.count (node.store.tx_begin_read ())); + ASSERT_EQ (1, node.unchecked.count ()); std::this_thread::sleep_for (std::chrono::seconds (2)); node.unchecked_cleanup (); ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ())); - ASSERT_EQ (0, node.unchecked.count (node.store.tx_begin_read ())); + ASSERT_EQ (0, node.unchecked.count ()); } /** This checks that a node can be opened (without being blocked) when a write lock is held elsewhere */ diff --git a/nano/core_test/unchecked_map.cpp b/nano/core_test/unchecked_map.cpp index ee7a3f63..c90f9d2b 100644 --- a/nano/core_test/unchecked_map.cpp +++ b/nano/core_test/unchecked_map.cpp @@ -73,18 +73,18 @@ TEST (block_store, one_bootstrap) .build_shared (); unchecked.put (block1->hash (), nano::unchecked_info{ block1 }); auto check_block_is_listed = [&] (nano::transaction const & transaction_a, nano::block_hash const & block_hash_a) { - return unchecked.get (transaction_a, block_hash_a).size () > 0; + return unchecked.get (block_hash_a).size () > 0; }; // Waits for the block1 to get saved in the database ASSERT_TIMELY (10s, check_block_is_listed (store->tx_begin_read (), block1->hash ())); auto transaction = store->tx_begin_read (); std::vector dependencies; - unchecked.for_each (transaction, [&dependencies] (nano::unchecked_key const & key, nano::unchecked_info const & info) { + unchecked.for_each ([&dependencies] (nano::unchecked_key const & key, nano::unchecked_info const & info) { dependencies.push_back (key.key ()); }); auto hash1 = dependencies[0]; ASSERT_EQ (block1->hash (), hash1); - auto blocks = unchecked.get (transaction, hash1); + auto blocks = unchecked.get (hash1); ASSERT_EQ (1, blocks.size ()); auto block2 = blocks[0].block; ASSERT_EQ (*block1, *block2); @@ -109,25 +109,24 @@ TEST (unchecked, simple) .work (5) .build_shared (); // Asserts the block wasn't added yet to the unchecked table - auto block_listing1 = unchecked.get (store->tx_begin_read (), block->previous ()); + auto block_listing1 = unchecked.get (block->previous ()); ASSERT_TRUE (block_listing1.empty ()); // Enqueues a block to be saved on the unchecked table unchecked.put (block->previous (), nano::unchecked_info (block)); // Waits for the block to get written in the database - auto check_block_is_listed = [&] (nano::transaction const & transaction_a, nano::block_hash const & block_hash_a) { - return unchecked.get (transaction_a, block_hash_a).size () > 0; + auto check_block_is_listed = [&] (nano::block_hash const & block_hash_a) { + return unchecked.get (block_hash_a).size () > 0; }; - ASSERT_TIMELY (5s, check_block_is_listed (store->tx_begin_read (), block->previous ())); - auto transaction = store->tx_begin_write (); + ASSERT_TIMELY (5s, check_block_is_listed (block->previous ())); // Retrieves the block from the database - auto block_listing2 = unchecked.get (transaction, block->previous ()); + auto block_listing2 = unchecked.get (block->previous ()); ASSERT_FALSE (block_listing2.empty ()); // Asserts the added block is equal to the retrieved one ASSERT_EQ (*block, *(block_listing2[0].block)); // Deletes the block from the database - unchecked.del (transaction, nano::unchecked_key (block->previous (), block->hash ())); + unchecked.del (nano::unchecked_key (block->previous (), block->hash ())); // Asserts the block is deleted - auto block_listing3 = unchecked.get (transaction, block->previous ()); + auto block_listing3 = unchecked.get (block->previous ()); ASSERT_TRUE (block_listing3.empty ()); } @@ -154,19 +153,19 @@ TEST (unchecked, multiple) .work (5) .build_shared (); // Asserts the block wasn't added yet to the unchecked table - auto block_listing1 = unchecked.get (store->tx_begin_read (), block->previous ()); + auto block_listing1 = unchecked.get (block->previous ()); ASSERT_TRUE (block_listing1.empty ()); // Enqueues the first block unchecked.put (block->previous (), nano::unchecked_info (block)); // Enqueues a second block unchecked.put (block->source (), nano::unchecked_info (block)); - auto check_block_is_listed = [&] (nano::transaction const & transaction_a, nano::block_hash const & block_hash_a) { - return unchecked.get (transaction_a, block_hash_a).size () > 0; + auto check_block_is_listed = [&] (nano::block_hash const & block_hash_a) { + return unchecked.get (block_hash_a).size () > 0; }; // Waits for and asserts the first block gets saved in the database - ASSERT_TIMELY (5s, check_block_is_listed (store->tx_begin_read (), block->previous ())); + ASSERT_TIMELY (5s, check_block_is_listed (block->previous ())); // Waits for and asserts the second block gets saved in the database - ASSERT_TIMELY (5s, check_block_is_listed (store->tx_begin_read (), block->source ())); + ASSERT_TIMELY (5s, check_block_is_listed (block->source ())); } // This test ensures that a block can't occur twice in the unchecked table. @@ -187,19 +186,19 @@ TEST (unchecked, double_put) .work (5) .build_shared (); // Asserts the block wasn't added yet to the unchecked table - auto block_listing1 = unchecked.get (store->tx_begin_read (), block->previous ()); + auto block_listing1 = unchecked.get (block->previous ()); ASSERT_TRUE (block_listing1.empty ()); // Enqueues the block to be saved in the unchecked table unchecked.put (block->previous (), nano::unchecked_info (block)); // Enqueues the block again in an attempt to have it there twice unchecked.put (block->previous (), nano::unchecked_info (block)); - auto check_block_is_listed = [&] (nano::transaction const & transaction_a, nano::block_hash const & block_hash_a) { - return unchecked.get (transaction_a, block_hash_a).size () > 0; + auto check_block_is_listed = [&] (nano::block_hash const & block_hash_a) { + return unchecked.get (block_hash_a).size () > 0; }; // Waits for and asserts the block was added at least once - ASSERT_TIMELY (5s, check_block_is_listed (store->tx_begin_read (), block->previous ())); + ASSERT_TIMELY (5s, check_block_is_listed (block->previous ())); // Asserts the block was added at most once -- this is objective of this test. - auto block_listing2 = unchecked.get (store->tx_begin_read (), block->previous ()); + auto block_listing2 = unchecked.get (block->previous ()); ASSERT_EQ (block_listing2.size (), 1); } @@ -251,8 +250,7 @@ TEST (unchecked, multiple_get) // we cannot trust the count() method if the backend is rocksdb auto count_unchecked_blocks_one_by_one = [&store, &unchecked] () { size_t count = 0; - auto transaction = store->tx_begin_read (); - unchecked.for_each (transaction, [&count] (nano::unchecked_key const & key, nano::unchecked_info const & info) { + unchecked.for_each ([&count] (nano::unchecked_key const & key, nano::unchecked_info const & info) { ++count; }); return count; @@ -263,8 +261,7 @@ TEST (unchecked, multiple_get) std::vector unchecked1; // Asserts the entries will be found for the provided key - auto transaction = store->tx_begin_read (); - auto unchecked1_blocks = unchecked.get (transaction, block1->previous ()); + auto unchecked1_blocks = unchecked.get (block1->previous ()); ASSERT_EQ (unchecked1_blocks.size (), 3); for (auto & i : unchecked1_blocks) { @@ -276,7 +273,7 @@ TEST (unchecked, multiple_get) ASSERT_TRUE (std::find (unchecked1.begin (), unchecked1.end (), block3->hash ()) != unchecked1.end ()); std::vector unchecked2; // Asserts the entries will be found for the provided key - auto unchecked2_blocks = unchecked.get (transaction, block1->hash ()); + auto unchecked2_blocks = unchecked.get (block1->hash ()); ASSERT_EQ (unchecked2_blocks.size (), 2); for (auto & i : unchecked2_blocks) { @@ -286,14 +283,14 @@ TEST (unchecked, multiple_get) ASSERT_TRUE (std::find (unchecked2.begin (), unchecked2.end (), block1->hash ()) != unchecked2.end ()); ASSERT_TRUE (std::find (unchecked2.begin (), unchecked2.end (), block2->hash ()) != unchecked2.end ()); // Asserts the entry is found by the key and the payload is saved - auto unchecked3 = unchecked.get (transaction, block2->previous ()); + auto unchecked3 = unchecked.get (block2->previous ()); ASSERT_EQ (unchecked3.size (), 1); ASSERT_EQ (unchecked3[0].block->hash (), block2->hash ()); // Asserts the entry is found by the key and the payload is saved - auto unchecked4 = unchecked.get (transaction, block3->hash ()); + auto unchecked4 = unchecked.get (block3->hash ()); ASSERT_EQ (unchecked4.size (), 1); ASSERT_EQ (unchecked4[0].block->hash (), block3->hash ()); // Asserts no entry is found for a block that wasn't added - auto unchecked5 = unchecked.get (transaction, block2->hash ()); + auto unchecked5 = unchecked.get (block2->hash ()); ASSERT_EQ (unchecked5.size (), 0); } diff --git a/nano/lib/config.hpp b/nano/lib/config.hpp index 6e785bfd..c4961c87 100644 --- a/nano/lib/config.hpp +++ b/nano/lib/config.hpp @@ -383,6 +383,8 @@ public: uint8_t const protocol_version = 0x13; /** Minimum accepted protocol version */ uint8_t const protocol_version_min = 0x12; + /** Minimum accepted protocol version used when bootstrapping */ + uint8_t const bootstrap_protocol_version_min = 0x13; }; std::string get_node_toml_config_path (boost::filesystem::path const & data_path); diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index f69f1b21..d9253a9c 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -45,6 +45,13 @@ enum class type : uint8_t optimistic_scheduler, handshake, + bootstrap_server_requests, + bootstrap_server_responses, + bootstrap_ascending, + bootstrap_ascending_connections, + bootstrap_ascending_thread, + bootstrap_ascending_accounts, + _last // Must be the last enum }; @@ -280,6 +287,54 @@ enum class detail : uint8_t missing_cookie, invalid_genesis, + // bootstrap ascending + missing_tag, + reply, + track, + timeout, + nothing_new, + + // bootstrap ascending connections + connect, + connect_missing, + connect_failed, + connect_success, + reuse, + + // bootstrap ascending thread + read_block, + read_block_done, + read_block_end, + read_block_error, + + // bootstrap ascending accounts + prioritize, + prioritize_failed, + block, + unblock, + unblock_failed, + + next_priority, + next_database, + next_none, + + blocking_insert, + blocking_erase_overflow, + priority_insert, + priority_erase_threshold, + priority_erase_block, + priority_erase_overflow, + deprioritize, + deprioritize_failed, + + // active + started_hinted, + started_optimistic, + + // optimistic + pop_gap, + pop_leaf, + _last // Must be the last enum }; diff --git a/nano/lib/threading.cpp b/nano/lib/threading.cpp index d857c9e9..69867033 100644 --- a/nano/lib/threading.cpp +++ b/nano/lib/threading.cpp @@ -102,6 +102,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::vote_generator_queue: thread_role_name_string = "Voting que"; break; + case nano::thread_role::name::ascending_bootstrap: + thread_role_name_string = "Bootstrap asc"; + break; case nano::thread_role::name::bootstrap_server: thread_role_name_string = "Bootstrap serv"; break; diff --git a/nano/lib/threading.hpp b/nano/lib/threading.hpp index f8724166..1aaab00f 100644 --- a/nano/lib/threading.hpp +++ b/nano/lib/threading.hpp @@ -49,6 +49,9 @@ namespace thread_role bootstrap_server, telemetry, optimistic_scheduler, + ascending_bootstrap, + bootstrap_server_requests, + bootstrap_server_responses, }; /* diff --git a/nano/nano_node/entry.cpp b/nano/nano_node/entry.cpp index f04b2f55..e859d2b4 100644 --- a/nano/nano_node/entry.cpp +++ b/nano/nano_node/entry.cpp @@ -426,7 +426,7 @@ int main (int argc, char * const * argv) } // Check all unchecked keys for matching frontier hashes. Indicates an issue with process_batch algorithm - node->unchecked.for_each (transaction, [&frontier_hashes] (nano::unchecked_key const & key, nano::unchecked_info const & info) { + node->unchecked.for_each ([&frontier_hashes] (nano::unchecked_key const & key, nano::unchecked_info const & info) { auto it = frontier_hashes.find (key.key ()); if (it != frontier_hashes.cend ()) { @@ -999,7 +999,7 @@ int main (int argc, char * const * argv) if (timer_l.after_deadline (std::chrono::seconds (15))) { timer_l.restart (); - std::cout << boost::str (boost::format ("%1% (%2%) blocks processed (unchecked), %3% remaining") % node->ledger.cache.block_count % node->unchecked.count (node->store.tx_begin_read ()) % node->block_processor.size ()) << std::endl; + std::cout << boost::str (boost::format ("%1% (%2%) blocks processed (unchecked), %3% remaining") % node->ledger.cache.block_count % node->unchecked.count () % node->block_processor.size ()) << std::endl; } } @@ -1847,7 +1847,7 @@ int main (int argc, char * const * argv) if (timer_l.after_deadline (std::chrono::seconds (60))) { timer_l.restart (); - std::cout << boost::str (boost::format ("%1% (%2%) blocks processed (unchecked)") % node.node->ledger.cache.block_count % node.node->unchecked.count (node.node->store.tx_begin_read ())) << std::endl; + std::cout << boost::str (boost::format ("%1% (%2%) blocks processed (unchecked)") % node.node->ledger.cache.block_count % node.node->unchecked.count ()) << std::endl; } } diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 9289b863..20a0b518 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -34,6 +34,8 @@ add_library( blockprocessor.cpp bootstrap/block_deserializer.hpp bootstrap/block_deserializer.cpp + bootstrap/bootstrap_ascending.hpp + bootstrap/bootstrap_ascending.cpp bootstrap/bootstrap_attempt.hpp bootstrap/bootstrap_attempt.cpp bootstrap/bootstrap_bulk_pull.hpp diff --git a/nano/node/bootstrap/bootstrap.hpp b/nano/node/bootstrap/bootstrap.hpp index 66852b78..5b6f62ef 100644 --- a/nano/node/bootstrap/bootstrap.hpp +++ b/nano/node/bootstrap/bootstrap.hpp @@ -27,7 +27,8 @@ enum class bootstrap_mode { legacy, lazy, - wallet_lazy + wallet_lazy, + ascending }; enum class sync_result { diff --git a/nano/node/bootstrap/bootstrap_ascending.cpp b/nano/node/bootstrap/bootstrap_ascending.cpp new file mode 100644 index 00000000..9cefe080 --- /dev/null +++ b/nano/node/bootstrap/bootstrap_ascending.cpp @@ -0,0 +1,855 @@ +#include +#include +#include +#include +#include +#include + +#include + +using namespace std::chrono_literals; + +/* + * database_iterator + */ + +nano::bootstrap_ascending::database_iterator::database_iterator (nano::store & store_a, table_type table_a) : + store{ store_a }, + table{ table_a } +{ +} + +nano::account nano::bootstrap_ascending::database_iterator::operator* () const +{ + return current; +} + +void nano::bootstrap_ascending::database_iterator::next (nano::transaction & tx) +{ + switch (table) + { + case table_type::account: + { + auto i = current.number () + 1; + auto item = store.account.begin (tx, i); + if (item != store.account.end ()) + { + current = item->first; + } + else + { + current = { 0 }; + } + break; + } + case table_type::pending: + { + auto i = current.number () + 1; + auto item = store.pending.begin (tx, nano::pending_key{ i, 0 }); + if (item != store.pending.end ()) + { + current = item->first.account; + } + else + { + current = { 0 }; + } + break; + } + } +} + +/* + * buffered_iterator + */ + +nano::bootstrap_ascending::buffered_iterator::buffered_iterator (nano::store & store_a) : + store{ store_a }, + accounts_iterator{ store, database_iterator::table_type::account }, + pending_iterator{ store, database_iterator::table_type::pending } +{ +} + +nano::account nano::bootstrap_ascending::buffered_iterator::operator* () const +{ + return !buffer.empty () ? buffer.front () : nano::account{ 0 }; +} + +nano::account nano::bootstrap_ascending::buffered_iterator::next () +{ + if (!buffer.empty ()) + { + buffer.pop_front (); + } + else + { + fill (); + } + + return *(*this); +} + +void nano::bootstrap_ascending::buffered_iterator::fill () +{ + debug_assert (buffer.empty ()); + + // Fill half from accounts table and half from pending table + auto transaction = store.tx_begin_read (); + + for (int n = 0; n < size / 2; ++n) + { + accounts_iterator.next (transaction); + if (!(*accounts_iterator).is_zero ()) + { + buffer.push_back (*accounts_iterator); + } + } + + for (int n = 0; n < size / 2; ++n) + { + pending_iterator.next (transaction); + if (!(*pending_iterator).is_zero ()) + { + buffer.push_back (*pending_iterator); + } + } +} + +/* + * account_sets + */ + +nano::bootstrap_ascending::account_sets::account_sets (nano::stats & stats_a) : + stats{ stats_a } +{ +} + +void nano::bootstrap_ascending::account_sets::priority_up (nano::account const & account) +{ + if (!blocked (account)) + { + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::prioritize); + + auto iter = priorities.get ().find (account); + if (iter != priorities.get ().end ()) + { + priorities.get ().modify (iter, [] (auto & val) { + val.priority = std::min ((val.priority * account_sets::priority_increase), account_sets::priority_max); + }); + } + else + { + priorities.get ().insert ({ account, account_sets::priority_initial }); + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_insert); + + trim_overflow (); + } + } + else + { + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::prioritize_failed); + } +} + +void nano::bootstrap_ascending::account_sets::priority_down (nano::account const & account) +{ + auto iter = priorities.get ().find (account); + if (iter != priorities.get ().end ()) + { + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::deprioritize); + + auto priority_new = iter->priority - account_sets::priority_decrease; + if (priority_new <= account_sets::priority_cutoff) + { + priorities.get ().erase (iter); + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_threshold); + } + else + { + priorities.get ().modify (iter, [priority_new] (auto & val) { + val.priority = priority_new; + }); + } + } + else + { + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::deprioritize_failed); + } +} + +void nano::bootstrap_ascending::account_sets::block (nano::account const & account, nano::block_hash const & dependency) +{ + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::block); + + auto existing = priorities.get ().find (account); + auto entry = existing == priorities.get ().end () ? priority_entry{ 0, 0 } : *existing; + + priorities.get ().erase (account); + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_block); + + blocking.get ().insert ({ account, dependency, entry }); + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::blocking_insert); + + trim_overflow (); +} + +void nano::bootstrap_ascending::account_sets::unblock (nano::account const & account, std::optional const & hash) +{ + // Unblock only if the dependency is fulfilled + auto existing = blocking.get ().find (account); + if (existing != blocking.get ().end () && (!hash || existing->dependency == *hash)) + { + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::unblock); + + debug_assert (priorities.get ().count (account) == 0); + if (!existing->original_entry.account.is_zero ()) + { + debug_assert (existing->original_entry.account == account); + priorities.get ().insert (existing->original_entry); + } + else + { + priorities.get ().insert ({ account, account_sets::priority_initial }); + } + blocking.get ().erase (account); + + trim_overflow (); + } + else + { + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::unblock_failed); + } +} + +void nano::bootstrap_ascending::account_sets::timestamp (const nano::account & account, bool reset) +{ + const nano::millis_t tstamp = reset ? 0 : nano::milliseconds_since_epoch (); + + auto iter = priorities.get ().find (account); + if (iter != priorities.get ().end ()) + { + priorities.get ().modify (iter, [tstamp] (auto & entry) { + entry.timestamp = tstamp; + }); + } +} + +bool nano::bootstrap_ascending::account_sets::check_timestamp (const nano::account & account) const +{ + auto iter = priorities.get ().find (account); + if (iter != priorities.get ().end ()) + { + if (nano::milliseconds_since_epoch () - iter->timestamp < cooldown) + { + return false; + } + } + return true; +} + +void nano::bootstrap_ascending::account_sets::trim_overflow () +{ + if (priorities.size () > priorities_max) + { + // Evict the lowest priority entry + priorities.get ().erase (priorities.get ().begin ()); + + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_overflow); + } + if (blocking.size () > blocking_max) + { + // Evict the lowest priority entry + blocking.get ().erase (blocking.get ().begin ()); + + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::blocking_erase_overflow); + } +} + +nano::account nano::bootstrap_ascending::account_sets::next () +{ + if (priorities.empty ()) + { + return { 0 }; + } + + std::vector weights; + std::vector candidates; + + int iterations = 0; + while (candidates.size () < account_sets::consideration_count && iterations++ < account_sets::consideration_count * 10) + { + debug_assert (candidates.size () == weights.size ()); + + // Use a dedicated, uniformly distributed field for sampling to avoid problematic corner case when accounts in the queue are very close together + auto search = bootstrap_ascending::generate_id (); + auto iter = priorities.get ().lower_bound (search); + if (iter == priorities.get ().end ()) + { + iter = priorities.get ().begin (); + } + + if (check_timestamp (iter->account)) + { + candidates.push_back (iter->account); + weights.push_back (iter->priority); + } + } + + if (candidates.empty ()) + { + return { 0 }; // All sampled accounts are busy + } + + std::discrete_distribution dist{ weights.begin (), weights.end () }; + auto selection = dist (rng); + debug_assert (!weights.empty () && selection < weights.size ()); + auto result = candidates[selection]; + return result; +} + +bool nano::bootstrap_ascending::account_sets::blocked (nano::account const & account) const +{ + return blocking.get ().count (account) > 0; +} + +std::size_t nano::bootstrap_ascending::account_sets::priority_size () const +{ + return priorities.size (); +} + +std::size_t nano::bootstrap_ascending::account_sets::blocked_size () const +{ + return blocking.size (); +} + +float nano::bootstrap_ascending::account_sets::priority (nano::account const & account) const +{ + if (blocked (account)) + { + return 0.0f; + } + auto existing = priorities.get ().find (account); + if (existing != priorities.get ().end ()) + { + return existing->priority; + } + return account_sets::priority_cutoff; +} + +auto nano::bootstrap_ascending::account_sets::info () const -> info_t +{ + return { blocking, priorities }; +} + +std::unique_ptr nano::bootstrap_ascending::account_sets::collect_container_info (const std::string & name) +{ + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "priorities", priorities.size (), sizeof (decltype (priorities)::value_type) })); + composite->add_component (std::make_unique (container_info{ "blocking", blocking.size (), sizeof (decltype (blocking)::value_type) })); + return composite; +} + +/* + * priority_entry + */ + +nano::bootstrap_ascending::account_sets::priority_entry::priority_entry (nano::account account_a, float priority_a) : + account{ account_a }, + priority{ priority_a } +{ + id = nano::bootstrap_ascending::generate_id (); +} + +/* + * bootstrap_ascending + */ + +nano::bootstrap_ascending::bootstrap_ascending (nano::node & node_a, nano::store & store_a, nano::block_processor & block_processor_a, nano::ledger & ledger_a, nano::network & network_a, nano::stats & stat_a) : + node{ node_a }, + store{ store_a }, + block_processor{ block_processor_a }, + ledger{ ledger_a }, + network{ network_a }, + stats{ stat_a }, + accounts{ stats }, + iterator{ store }, + limiter{ requests_limit, 1.0 }, + database_limiter{ database_requests_limit, 1.0 } +{ + // TODO: This is called from a very congested blockprocessor thread. Offload this work to a dedicated processing thread + block_processor.batch_processed.add ([this] (auto const & batch) { + { + nano::lock_guard lock{ mutex }; + + auto transaction = store.tx_begin_read (); + for (auto const & [result, block] : batch) + { + debug_assert (block != nullptr); + + inspect (transaction, result, *block); + } + } + + condition.notify_all (); + }); +} + +nano::bootstrap_ascending::~bootstrap_ascending () +{ + // All threads must be stopped before destruction + debug_assert (!thread.joinable ()); + debug_assert (!timeout_thread.joinable ()); +} + +void nano::bootstrap_ascending::start () +{ + debug_assert (!thread.joinable ()); + debug_assert (!timeout_thread.joinable ()); + + thread = std::thread ([this] () { + nano::thread_role::set (nano::thread_role::name::ascending_bootstrap); + run (); + }); + + timeout_thread = std::thread ([this] () { + nano::thread_role::set (nano::thread_role::name::ascending_bootstrap); + run_timeouts (); + }); +} + +void nano::bootstrap_ascending::stop () +{ + nano::unique_lock lock{ mutex }; + stopped = true; + lock.unlock (); + condition.notify_all (); + nano::join_or_pass (thread); + nano::join_or_pass (timeout_thread); +} + +nano::bootstrap_ascending::id_t nano::bootstrap_ascending::generate_id () +{ + id_t id; + nano::random_pool::generate_block (reinterpret_cast (&id), sizeof (id)); + return id; +} + +void nano::bootstrap_ascending::send (std::shared_ptr channel, async_tag tag) +{ + debug_assert (tag.type == async_tag::query_type::blocks_by_hash || tag.type == async_tag::query_type::blocks_by_account); + + nano::asc_pull_req request{ node.network_params.network }; + request.id = tag.id; + request.type = nano::asc_pull_type::blocks; + + nano::asc_pull_req::blocks_payload request_payload; + request_payload.start = tag.start; + request_payload.count = pull_count; + request_payload.start_type = tag.type == async_tag::query_type::blocks_by_hash ? nano::asc_pull_req::hash_type::block : nano::asc_pull_req::hash_type::account; + + request.payload = request_payload; + request.update_header (); + + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::request, nano::stat::dir::out); + + // TODO: There is no feedback mechanism if bandwidth limiter starts dropping our requests + channel->send ( + request, nullptr, + nano::transport::buffer_drop_policy::limiter, nano::bandwidth_limit_type::bootstrap); +} + +size_t nano::bootstrap_ascending::priority_size () const +{ + nano::lock_guard lock{ mutex }; + return accounts.priority_size (); +} + +size_t nano::bootstrap_ascending::blocked_size () const +{ + nano::lock_guard lock{ mutex }; + return accounts.blocked_size (); +} + +/** Inspects a block that has been processed by the block processor +- Marks an account as blocked if the result code is gap source as there is no reason request additional blocks for this account until the dependency is resolved +- Marks an account as forwarded if it has been recently referenced by a block that has been inserted. + */ +void nano::bootstrap_ascending::inspect (nano::transaction const & tx, nano::process_return const & result, nano::block const & block) +{ + auto const hash = block.hash (); + + switch (result.code) + { + case nano::process_result::progress: + { + const auto account = ledger.account (tx, hash); + const auto is_send = ledger.is_send (tx, block); + + // If we've inserted any block in to an account, unmark it as blocked + accounts.unblock (account); + accounts.priority_up (account); + accounts.timestamp (account, /* reset timestamp */ true); + + if (is_send) + { + // TODO: Encapsulate this as a helper somewhere + nano::account destination{ 0 }; + switch (block.type ()) + { + case nano::block_type::send: + destination = block.destination (); + break; + case nano::block_type::state: + destination = block.link ().as_account (); + break; + default: + debug_assert (false, "unexpected block type"); + break; + } + if (!destination.is_zero ()) + { + accounts.unblock (destination, hash); // Unblocking automatically inserts account into priority set + accounts.priority_up (destination); + } + } + } + break; + case nano::process_result::gap_source: + { + const auto account = block.previous ().is_zero () ? block.account () : ledger.account (tx, block.previous ()); + const auto source = block.source ().is_zero () ? block.link ().as_block_hash () : block.source (); + + // Mark account as blocked because it is missing the source block + accounts.block (account, source); + + // TODO: Track stats + } + break; + case nano::process_result::old: + { + // TODO: Track stats + } + break; + case nano::process_result::gap_previous: + { + // TODO: Track stats + } + break; + default: // No need to handle other cases + break; + } +} + +void nano::bootstrap_ascending::wait_blockprocessor () +{ + while (!stopped && block_processor.half_full ()) + { + std::this_thread::sleep_for (500ms); // Blockprocessor is relatively slow, sleeping here instead of using conditions + } +} + +void nano::bootstrap_ascending::wait_available_request () +{ + while (!stopped && !limiter.should_pass (1)) + { + std::this_thread::sleep_for (50ms); // Give it at least some time to cooldown to avoid hitting the limit too frequently + } +} + +std::shared_ptr nano::bootstrap_ascending::available_channel () +{ + auto channels = network.random_set (32, node.network_params.network.bootstrap_protocol_version_min, /* include temporary channels */ true); + for (auto & channel : channels) + { + if (!channel->max ()) + { + return channel; + } + } + return nullptr; +} + +std::shared_ptr nano::bootstrap_ascending::wait_available_channel () +{ + std::shared_ptr channel; + while (!stopped && !(channel = available_channel ())) + { + std::this_thread::sleep_for (100ms); + } + return channel; +} + +nano::account nano::bootstrap_ascending::available_account () +{ + { + auto account = accounts.next (); + if (!account.is_zero ()) + { + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_priority); + return account; + } + } + + if (database_limiter.should_pass (1)) + { + auto account = iterator.next (); + if (!account.is_zero ()) + { + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_database); + return account; + } + } + + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_none); + return { 0 }; +} + +nano::account nano::bootstrap_ascending::wait_available_account () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + auto account = available_account (); + if (!account.is_zero ()) + { + accounts.timestamp (account); + return account; + } + else + { + condition.wait_for (lock, 100ms); + } + } + return { 0 }; +} + +bool nano::bootstrap_ascending::request (nano::account & account, std::shared_ptr & channel) +{ + async_tag tag{}; + tag.id = generate_id (); + tag.account = account; + tag.time = nano::milliseconds_since_epoch (); + + // Check if the account picked has blocks, if it does, start the pull from the highest block + auto info = store.account.get (store.tx_begin_read (), account); + if (info) + { + tag.type = async_tag::query_type::blocks_by_hash; + tag.start = info->head; + } + else + { + tag.type = async_tag::query_type::blocks_by_account; + tag.start = account; + } + + on_request.notify (tag, channel); + + track (tag); + send (channel, tag); + + return true; // Request sent +} + +bool nano::bootstrap_ascending::run_one () +{ + // Ensure there is enough space in blockprocessor for queuing new blocks + wait_blockprocessor (); + + // Do not do too many requests in parallel, impose throttling + wait_available_request (); + + // Waits for channel that is not full + auto channel = wait_available_channel (); + if (!channel) + { + return false; + } + + // Waits for account either from priority queue or database + auto account = wait_available_account (); + if (account.is_zero ()) + { + return false; + } + + bool success = request (account, channel); + return success; +} + +void nano::bootstrap_ascending::run () +{ + while (!stopped) + { + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop); + run_one (); + } +} + +void nano::bootstrap_ascending::run_timeouts () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + auto & tags_by_order = tags.get (); + while (!tags_by_order.empty () && nano::time_difference (tags_by_order.front ().time, nano::milliseconds_since_epoch ()) > timeout) + { + auto tag = tags_by_order.front (); + tags_by_order.pop_front (); + on_timeout.notify (tag); + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::timeout); + } + condition.wait_for (lock, 1s, [this] () { return stopped; }); + } +} + +void nano::bootstrap_ascending::process (const nano::asc_pull_ack & message) +{ + nano::unique_lock lock{ mutex }; + + // Only process messages that have a known tag + auto & tags_by_id = tags.get (); + if (tags_by_id.count (message.id) > 0) + { + auto iterator = tags_by_id.find (message.id); + auto tag = *iterator; + tags_by_id.erase (iterator); + + lock.unlock (); + + on_reply.notify (tag); + condition.notify_all (); + std::visit ([this, &tag] (auto && request) { return process (request, tag); }, message.payload); + } + else + { + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::missing_tag); + } +} + +void nano::bootstrap_ascending::process (const nano::asc_pull_ack::blocks_payload & response, const nano::bootstrap_ascending::async_tag & tag) +{ + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::reply); + + auto result = verify (response, tag); + switch (result) + { + case verify_result::ok: + { + stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::blocks, nano::stat::dir::in, response.blocks.size ()); + + for (auto & block : response.blocks) + { + block_processor.add (block); + } + } + break; + case verify_result::nothing_new: + { + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::nothing_new); + + { + nano::lock_guard lock{ mutex }; + accounts.priority_down (tag.account); + } + } + break; + case verify_result::invalid: + { + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::invalid); + // TODO: Log + } + break; + } +} + +void nano::bootstrap_ascending::process (const nano::asc_pull_ack::account_info_payload & response, const nano::bootstrap_ascending::async_tag & tag) +{ + // TODO: Make use of account info +} + +void nano::bootstrap_ascending::process (const nano::empty_payload & response, const nano::bootstrap_ascending::async_tag & tag) +{ + // Should not happen + debug_assert (false, "empty payload"); +} + +nano::bootstrap_ascending::verify_result nano::bootstrap_ascending::verify (const nano::asc_pull_ack::blocks_payload & response, const nano::bootstrap_ascending::async_tag & tag) const +{ + auto const & blocks = response.blocks; + + if (blocks.empty ()) + { + return verify_result::nothing_new; + } + if (blocks.size () == 1 && blocks.front ()->hash () == tag.start.as_block_hash ()) + { + return verify_result::nothing_new; + } + + auto const & first = blocks.front (); + switch (tag.type) + { + case async_tag::query_type::blocks_by_hash: + { + if (first->hash () != tag.start.as_block_hash ()) + { + // TODO: Stat & log + return verify_result::invalid; + } + } + break; + case async_tag::query_type::blocks_by_account: + { + // Open & state blocks always contain account field + if (first->account () != tag.start.as_account ()) + { + // TODO: Stat & log + return verify_result::invalid; + } + } + break; + default: + return verify_result::invalid; + } + + // Verify blocks make a valid chain + nano::block_hash previous_hash = blocks.front ()->hash (); + for (int n = 1; n < blocks.size (); ++n) + { + auto & block = blocks[n]; + if (block->previous () != previous_hash) + { + // TODO: Stat & log + return verify_result::invalid; // Blocks do not make a chain + } + previous_hash = block->hash (); + } + + return verify_result::ok; +} + +void nano::bootstrap_ascending::track (async_tag const & tag) +{ + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::track); + + nano::lock_guard lock{ mutex }; + debug_assert (tags.get ().count (tag.id) == 0); + tags.get ().insert (tag); +} + +auto nano::bootstrap_ascending::info () const -> account_sets::info_t +{ + nano::lock_guard lock{ mutex }; + return accounts.info (); +} + +std::unique_ptr nano::bootstrap_ascending::collect_container_info (std::string const & name) +{ + nano::lock_guard lock{ mutex }; + + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "tags", tags.size (), sizeof (decltype (tags)::value_type) })); + composite->add_component (accounts.collect_container_info ("accounts")); + return composite; +} diff --git a/nano/node/bootstrap/bootstrap_ascending.hpp b/nano/node/bootstrap/bootstrap_ascending.hpp new file mode 100644 index 00000000..62bac5b6 --- /dev/null +++ b/nano/node/bootstrap/bootstrap_ascending.hpp @@ -0,0 +1,329 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace mi = boost::multi_index; + +namespace nano +{ +class block_processor; +class ledger; +class network; + +namespace transport +{ + class channel; +} + +class bootstrap_ascending +{ + using id_t = uint64_t; + +public: + bootstrap_ascending (nano::node &, nano::store &, nano::block_processor &, nano::ledger &, nano::network &, nano::stats &); + ~bootstrap_ascending (); + + void start (); + void stop (); + + /** + * Process `asc_pull_ack` message coming from network + */ + void process (nano::asc_pull_ack const & message); + +public: // Container info + std::unique_ptr collect_container_info (std::string const & name); + size_t blocked_size () const; + size_t priority_size () const; + +private: // Dependencies + nano::node & node; + nano::store & store; + nano::block_processor & block_processor; + nano::ledger & ledger; + nano::network & network; + nano::stats & stats; + +public: // async_tag + struct async_tag + { + enum class query_type + { + invalid = 0, // Default initialization + blocks_by_hash, + blocks_by_account, + // TODO: account_info, + }; + + query_type type{ query_type::invalid }; + id_t id{ 0 }; + nano::hash_or_account start{ 0 }; + nano::millis_t time{ 0 }; + nano::account account{ 0 }; + }; + +public: // Events + nano::observer_set &> on_request; + nano::observer_set on_reply; + nano::observer_set on_timeout; + +private: + /* Inspects a block that has been processed by the block processor */ + void inspect (nano::transaction const &, nano::process_return const & result, nano::block const & block); + + void run (); + bool run_one (); + void run_timeouts (); + + /* Limits the number of requests per second we make */ + void wait_available_request (); + /* Throttles requesting new blocks, not to overwhelm blockprocessor */ + void wait_blockprocessor (); + /* Waits for channel with free capacity for bootstrap messages */ + std::shared_ptr wait_available_channel (); + std::shared_ptr available_channel (); + /* Waits until a suitable account outside of cool down period is available */ + nano::account available_account (); + nano::account wait_available_account (); + + bool request (nano::account &, std::shared_ptr &); + void send (std::shared_ptr, async_tag tag); + void track (async_tag const & tag); + + void process (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag); + void process (nano::asc_pull_ack::account_info_payload const & response, async_tag const & tag); + void process (nano::empty_payload const & response, async_tag const & tag); + + enum class verify_result + { + ok, + nothing_new, + invalid, + }; + + /** + * Verifies whether the received response is valid. Returns: + * - invalid: when received blocks do not correspond to requested hash/account or they do not make a valid chain + * - nothing_new: when received response indicates that the account chain does not have more blocks + * - ok: otherwise, if all checks pass + */ + verify_result verify (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag) const; + + static id_t generate_id (); + +public: // account_sets + /** This class tracks accounts various account sets which are shared among the multiple bootstrap threads */ + class account_sets + { + public: + explicit account_sets (nano::stats &); + + /** + * If an account is not blocked, increase its priority. + * If the account does not exist in priority set and is not blocked, inserts a new entry. + * Current implementation increases priority by 1.0f each increment + */ + void priority_up (nano::account const & account); + /** + * Decreases account priority + * Current implementation divides priority by 2.0f and saturates down to 1.0f. + */ + void priority_down (nano::account const & account); + void block (nano::account const & account, nano::block_hash const & dependency); + void unblock (nano::account const & account, std::optional const & hash = std::nullopt); + void timestamp (nano::account const & account, bool reset = false); + + nano::account next (); + + public: + bool blocked (nano::account const & account) const; + std::size_t priority_size () const; + std::size_t blocked_size () const; + /** + * Accounts in the ledger but not in priority list are assumed priority 1.0f + * Blocked accounts are assumed priority 0.0f + */ + float priority (nano::account const & account) const; + + public: // Container info + std::unique_ptr collect_container_info (std::string const & name); + + private: + void trim_overflow (); + bool check_timestamp (nano::account const & account) const; + + private: // Dependencies + nano::stats & stats; + + private: + struct priority_entry + { + nano::account account{ 0 }; + float priority{ 0 }; + nano::millis_t timestamp{ 0 }; + id_t id{ 0 }; // Uniformly distributed, used for random querying + + priority_entry (nano::account account, float priority); + }; + + struct blocking_entry + { + nano::account account{ 0 }; + nano::block_hash dependency{ 0 }; + priority_entry original_entry{ 0, 0 }; + + float priority () const + { + return original_entry.priority; + } + }; + + // clang-format off + class tag_account {}; + class tag_priority {}; + class tag_sequenced {}; + class tag_id {}; + + // Tracks the ongoing account priorities + // This only stores account priorities > 1.0f. + using ordered_priorities = boost::multi_index_container>, + mi::ordered_unique, + mi::member>, + mi::ordered_non_unique, + mi::member>, + mi::ordered_unique, + mi::member> + >>; + + // A blocked account is an account that has failed to insert a new block because the source block is not currently present in the ledger + // An account is unblocked once it has a block successfully inserted + using ordered_blocking = boost::multi_index_container>, + mi::ordered_unique, + mi::member>, + mi::ordered_non_unique, + mi::const_mem_fun> + >>; + // clang-format on + + ordered_priorities priorities; + ordered_blocking blocking; + + std::default_random_engine rng; + + private: // TODO: Move into config + static std::size_t constexpr consideration_count = 4; + static std::size_t constexpr priorities_max = 256 * 1024; + static std::size_t constexpr blocking_max = 256 * 1024; + static nano::millis_t constexpr cooldown = 3 * 1000; + + public: // Consts + static float constexpr priority_initial = 8.0f; + static float constexpr priority_increase = 2.0f; + static float constexpr priority_decrease = 0.5f; + static float constexpr priority_max = 32.0f; + static float constexpr priority_cutoff = 1.0f; + + public: + using info_t = std::tuple; // + info_t info () const; + }; + + account_sets::info_t info () const; + +private: // Database iterators + class database_iterator + { + public: + enum class table_type + { + account, + pending + }; + + explicit database_iterator (nano::store & store, table_type); + nano::account operator* () const; + void next (nano::transaction & tx); + + private: + nano::store & store; + nano::account current{ 0 }; + const table_type table; + }; + + class buffered_iterator + { + public: + explicit buffered_iterator (nano::store & store); + nano::account operator* () const; + nano::account next (); + + private: + void fill (); + + private: + nano::store & store; + std::deque buffer; + + database_iterator accounts_iterator; + database_iterator pending_iterator; + + static std::size_t constexpr size = 1024; + }; + +private: + account_sets accounts; + buffered_iterator iterator; + + // clang-format off + class tag_sequenced {}; + class tag_id {}; + class tag_account {}; + + using ordered_tags = boost::multi_index_container>, + mi::hashed_unique, + mi::member>, + mi::hashed_non_unique, + mi::member> + >>; + // clang-format on + ordered_tags tags; + + nano::bandwidth_limiter limiter; + // Requests for accounts from database have much lower hitrate and could introduce strain on the network + // A separate (lower) limiter ensures that we always reserve resources for querying accounts from priority queue + nano::bandwidth_limiter database_limiter; + + bool stopped{ false }; + mutable nano::mutex mutex; + mutable nano::condition_variable condition; + std::thread thread; + std::thread timeout_thread; + +private: // TODO: Move into config + static std::size_t constexpr requests_limit{ 128 }; + static std::size_t constexpr database_requests_limit{ 1024 }; + static std::size_t constexpr pull_count{ nano::bootstrap_server::max_blocks }; + static nano::millis_t constexpr timeout{ 1000 * 3 }; +}; +} diff --git a/nano/node/bootstrap/bootstrap_attempt.cpp b/nano/node/bootstrap/bootstrap_attempt.cpp index 58990698..de9ca3e6 100644 --- a/nano/node/bootstrap/bootstrap_attempt.cpp +++ b/nano/node/bootstrap/bootstrap_attempt.cpp @@ -99,6 +99,8 @@ char const * nano::bootstrap_attempt::mode_text () return "lazy"; case nano::bootstrap_mode::wallet_lazy: return "wallet_lazy"; + case nano::bootstrap_mode::ascending: + return "ascending"; } return "unknown"; } diff --git a/nano/node/bootstrap/bootstrap_bulk_pull.cpp b/nano/node/bootstrap/bootstrap_bulk_pull.cpp index ffcd38e8..6f3ff097 100644 --- a/nano/node/bootstrap/bootstrap_bulk_pull.cpp +++ b/nano/node/bootstrap/bootstrap_bulk_pull.cpp @@ -348,7 +348,7 @@ void nano::bulk_pull_server::set_current_end () connection->node->logger.try_log (boost::str (boost::format ("Bulk pull request for block hash: %1%") % request->start.to_string ())); } - current = request->start.as_block_hash (); + current = ascending () ? connection->node->store.block.successor (transaction, request->start.as_block_hash ()) : request->start.as_block_hash (); include_start = true; } else diff --git a/nano/node/bootstrap/bootstrap_connections.cpp b/nano/node/bootstrap/bootstrap_connections.cpp index d87d4bd9..ce901561 100644 --- a/nano/node/bootstrap/bootstrap_connections.cpp +++ b/nano/node/bootstrap/bootstrap_connections.cpp @@ -172,8 +172,8 @@ void nano::bootstrap_connections::connect_client (nano::tcp_endpoint const & end case boost::system::errc::connection_refused: case boost::system::errc::operation_canceled: case boost::system::errc::timed_out: - case 995: //Windows The I/O operation has been aborted because of either a thread exit or an application request - case 10061: //Windows No connection could be made because the target machine actively refused it + case 995: // Windows The I/O operation has been aborted because of either a thread exit or an application request + case 10061: // Windows No connection could be made because the target machine actively refused it break; } } @@ -286,7 +286,7 @@ void nano::bootstrap_connections::populate_connections (bool repeat) // Not many peers respond, need to try to make more connections than we need. for (auto i = 0u; i < delta; i++) { - auto endpoint (node.network.bootstrap_peer ()); + auto endpoint (node.network.bootstrap_peer ()); // Legacy bootstrap is compatible with older version of protocol if (endpoint != nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0) && (node.flags.allow_bootstrap_peers_duplicates || endpoints.find (endpoint) == endpoints.end ()) && !node.network.excluded_peers.check (endpoint)) { connect_client (endpoint); diff --git a/nano/node/cli.cpp b/nano/node/cli.cpp index d780ccce..5dd2c39f 100644 --- a/nano/node/cli.cpp +++ b/nano/node/cli.cpp @@ -94,6 +94,7 @@ void nano::add_node_flag_options (boost::program_options::options_description & ("disable_legacy_bootstrap", "Disables legacy bootstrap") ("disable_wallet_bootstrap", "Disables wallet lazy bootstrap") ("disable_ongoing_bootstrap", "Disable ongoing bootstrap") + ("disable_ascending_bootstrap", "Disable ascending bootstrap") ("disable_rep_crawler", "Disable rep crawler") ("disable_request_loop", "Disable request loop") ("disable_bootstrap_listener", "Disables bootstrap processing for TCP listener (not including realtime network TCP connections)") @@ -122,6 +123,7 @@ std::error_code nano::update_flags (nano::node_flags & flags_a, boost::program_o flags_a.disable_legacy_bootstrap = (vm.count ("disable_legacy_bootstrap") > 0); flags_a.disable_wallet_bootstrap = (vm.count ("disable_wallet_bootstrap") > 0); flags_a.disable_ongoing_bootstrap = (vm.count ("disable_ongoing_bootstrap") > 0); + flags_a.disable_ascending_bootstrap = (vm.count ("disable_ascending_bootstrap") > 0); flags_a.disable_rep_crawler = (vm.count ("disable_rep_crawler") > 0); flags_a.disable_request_loop = (vm.count ("disable_request_loop") > 0); if (!flags_a.inactive_node) @@ -213,7 +215,7 @@ bool copy_database (boost::filesystem::path const & data_path, boost::program_op auto & store (node.node->store); if (vm.count ("unchecked_clear")) { - node.node->unchecked.clear (store.tx_begin_write ()); + node.node->unchecked.clear (); } if (vm.count ("clear_send_ids")) { @@ -491,7 +493,7 @@ std::error_code nano::handle_node_options (boost::program_options::variables_map if (!node.node->init_error ()) { auto transaction (node.node->store.tx_begin_write ()); - node.node->unchecked.clear (transaction); + node.node->unchecked.clear (); std::cout << "Unchecked blocks deleted" << std::endl; } else diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 20548def..b2e027c0 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -1422,7 +1423,7 @@ void nano::json_handler::block_account () void nano::json_handler::block_count () { response_l.put ("count", std::to_string (node.ledger.cache.block_count)); - response_l.put ("unchecked", std::to_string (node.unchecked.count (node.store.tx_begin_read ()))); + response_l.put ("unchecked", std::to_string (node.unchecked.count ())); response_l.put ("cemented", std::to_string (node.ledger.cache.cemented_count)); if (node.flags.enable_pruning) { @@ -4115,7 +4116,7 @@ void nano::json_handler::unchecked () boost::property_tree::ptree unchecked; auto transaction (node.store.tx_begin_read ()); node.unchecked.for_each ( - transaction, [&unchecked, &json_block_l] (nano::unchecked_key const & key, nano::unchecked_info const & info) { + [&unchecked, &json_block_l] (nano::unchecked_key const & key, nano::unchecked_info const & info) { if (json_block_l) { boost::property_tree::ptree block_node_l; @@ -4137,7 +4138,7 @@ void nano::json_handler::unchecked_clear () { node.workers.push_task (create_worker_task ([] (std::shared_ptr const & rpc_l) { auto transaction (rpc_l->node.store.tx_begin_write ({ tables::unchecked })); - rpc_l->node.unchecked.clear (transaction); + rpc_l->node.unchecked.clear (); rpc_l->response_l.put ("success", ""); rpc_l->response_errors (); })); @@ -4151,7 +4152,7 @@ void nano::json_handler::unchecked_get () { bool done = false; node.unchecked.for_each ( - node.store.tx_begin_read (), [&] (nano::unchecked_key const & key, nano::unchecked_info const & info) { + [&] (nano::unchecked_key const & key, nano::unchecked_info const & info) { if (key.hash == hash) { response_l.put ("modified_timestamp", std::to_string (info.modified ())); @@ -4196,7 +4197,8 @@ void nano::json_handler::unchecked_keys () boost::property_tree::ptree unchecked; auto transaction (node.store.tx_begin_read ()); node.unchecked.for_each ( - transaction, key, [&unchecked, json_block_l] (nano::unchecked_key const & key, nano::unchecked_info const & info) { + key, + [&unchecked, json_block_l] (nano::unchecked_key const & key, nano::unchecked_info const & info) { boost::property_tree::ptree entry; entry.put ("key", key.key ().to_string ()); entry.put ("hash", info.block->hash ().to_string ()); @@ -5219,6 +5221,40 @@ void nano::json_handler::populate_backlog () response_errors (); } +void nano::json_handler::debug_bootstrap_priority_info () +{ + if (!ec) + { + auto [blocking, priorities] = node.ascendboot.info (); + + // priorities + { + boost::property_tree::ptree response_priorities; + for (auto const & entry : priorities) + { + const auto account = entry.account; + const auto priority = entry.priority; + + response_priorities.put (account.to_account (), priority); + } + response_l.add_child ("priorities", response_priorities); + } + // blocking + { + boost::property_tree::ptree response_blocking; + for (auto const & entry : blocking) + { + const auto account = entry.account; + const auto dependency = entry.dependency; + + response_blocking.put (account.to_account (), dependency.to_string ()); + } + response_l.add_child ("blocking", response_blocking); + } + } + response_errors (); +} + void nano::inprocess_rpc_handler::process_request (std::string const &, std::string const & body_a, std::function response_a) { // Note that if the rpc action is async, the shared_ptr lifetime will be extended by the action handler @@ -5384,6 +5420,7 @@ ipc_json_handler_no_arg_func_map create_ipc_json_handler_no_arg_func_map () no_arg_funcs.emplace ("work_peers", &nano::json_handler::work_peers); no_arg_funcs.emplace ("work_peers_clear", &nano::json_handler::work_peers_clear); no_arg_funcs.emplace ("populate_backlog", &nano::json_handler::populate_backlog); + no_arg_funcs.emplace ("debug_bootstrap_priority_info", &nano::json_handler::debug_bootstrap_priority_info); return no_arg_funcs; } diff --git a/nano/node/json_handler.hpp b/nano/node/json_handler.hpp index 4d2a055d..60a257e3 100644 --- a/nano/node/json_handler.hpp +++ b/nano/node/json_handler.hpp @@ -65,6 +65,7 @@ public: void confirmation_info (); void confirmation_quorum (); void confirmation_height_currently_processing (); + void debug_bootstrap_priority_info (); void database_txn_tracker (); void delegators (); void delegators_count (); diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 9e3e7a5f..336e9ac1 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -511,7 +512,7 @@ public: void asc_pull_ack (nano::asc_pull_ack const & message) override { - // TODO: Process in ascending bootstrap client + node.ascendboot.process (message); } private: diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 35940208..0253496e 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -111,7 +111,7 @@ public: void random_fill (std::array &) const; void fill_keepalive_self (std::array &) const; // Note: The minimum protocol version is used after the random selection, so number of peers can be less than expected. - std::unordered_set> random_set (std::size_t, uint8_t = 0, bool = false) const; + std::unordered_set> random_set (std::size_t count, uint8_t min_version = 0, bool include_temporary_channels = false) const; // Get the next peer for attempting a tcp bootstrap connection nano::tcp_endpoint bootstrap_peer (); nano::endpoint endpoint () const; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index ea551db9..e233eddc 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -201,6 +201,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co aggregator (config, stats, generator, final_generator, history, ledger, wallets, active), wallets (wallets_store.init_error (), *this), backlog{ nano::backlog_population_config (config), store, stats }, + ascendboot{ *this, store, block_processor, ledger, network, stats }, websocket{ config.websocket_config, observers, wallets, ledger, io_ctx, logger }, epoch_upgrader{ *this, ledger, store, network_params, logger }, startup_time (std::chrono::steady_clock::now ()), @@ -214,10 +215,9 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co block_publisher.connect (block_processor); gap_tracker.connect (block_processor); process_live_dispatcher.connect (block_processor); - unchecked.use_memory = [this] () { return ledger.bootstrap_weight_reached (); }; - unchecked.satisfied = [this] (nano::unchecked_info const & info) { + unchecked.satisfied.add ([this] (nano::unchecked_info const & info) { this->block_processor.add (info.block); - }; + }); inactive_vote_cache.rep_weight_query = [this] (nano::account const & rep) { return ledger.weight (rep); @@ -442,7 +442,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co if (!flags.disable_unchecked_drop && !use_bootstrap_weight && !flags.read_only) { auto const transaction (store.tx_begin_write ({ tables::unchecked })); - unchecked.clear (transaction); + unchecked.clear (); logger.always_log ("Dropping unchecked blocks"); } } @@ -585,6 +585,8 @@ std::unique_ptr nano::collect_container_info (no composite->add_component (node.inactive_vote_cache.collect_container_info ("inactive_vote_cache")); composite->add_component (collect_container_info (node.generator, "vote_generator")); composite->add_component (collect_container_info (node.final_generator, "vote_generator_final")); + composite->add_component (node.ascendboot.collect_container_info ("bootstrap_ascending")); + composite->add_component (node.unchecked.collect_container_info ("unchecked")); return composite; } @@ -696,6 +698,10 @@ void nano::node::start () backlog.start (); hinting.start (); bootstrap_server.start (); + if (!flags.disable_ascending_bootstrap) + { + ascendboot.start (); + } websocket.start (); telemetry.start (); } @@ -714,6 +720,10 @@ void nano::node::stop () // No tasks may wait for work generation in I/O threads, or termination signal capturing will be unable to call node::stop() distributed_work.stop (); backlog.stop (); + if (!flags.disable_ascending_bootstrap) + { + ascendboot.stop (); + } unchecked.stop (); block_processor.stop (); aggregator.stop (); @@ -971,7 +981,7 @@ void nano::node::unchecked_cleanup () auto const transaction (store.tx_begin_read ()); // Max 1M records to clean, max 2 minutes reading to prevent slow i/o systems issues unchecked.for_each ( - transaction, [this, &digests, &cleaning_list, &now] (nano::unchecked_key const & key, nano::unchecked_info const & info) { + [this, &digests, &cleaning_list, &now] (nano::unchecked_key const & key, nano::unchecked_info const & info) { if ((now - info.modified ()) > static_cast (config.unchecked_cutoff_time.count ())) { digests.push_back (network.publish_filter.hash (info.block)); @@ -991,9 +1001,9 @@ void nano::node::unchecked_cleanup () { auto key (cleaning_list.front ()); cleaning_list.pop_front (); - if (unchecked.exists (transaction, key)) + if (unchecked.exists (key)) { - unchecked.del (transaction, key); + unchecked.del (key); } } } @@ -1497,7 +1507,7 @@ nano::telemetry_data nano::node::local_telemetry () const telemetry_data.bandwidth_cap = config.bandwidth_limit; telemetry_data.protocol_version = network_params.network.protocol_version; telemetry_data.uptime = std::chrono::duration_cast (std::chrono::steady_clock::now () - startup_time).count (); - telemetry_data.unchecked_count = unchecked.count (ledger.store.tx_begin_read ()); + telemetry_data.unchecked_count = unchecked.count (); telemetry_data.genesis_block = network_params.ledger.genesis->hash (); telemetry_data.peer_count = nano::narrow_cast (network.size ()); telemetry_data.account_count = ledger.cache.account_count; diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 987dce2e..37a0669f 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -192,6 +193,7 @@ public: nano::request_aggregator aggregator; nano::wallets wallets; nano::backlog_population backlog; + nano::bootstrap_ascending ascendboot; nano::websocket_server websocket; nano::epoch_upgrader epoch_upgrader; nano::block_broadcast block_broadcast; diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 9b59a439..83826a21 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -101,8 +101,8 @@ public: std::size_t bandwidth_limit{ 10 * 1024 * 1024 }; /** By default, allow bursts of 15MB/s (not sustainable) */ double bandwidth_limit_burst_ratio{ 3. }; - /** Default boostrap outbound traffic limit is 16MB/s ~ 128Mbit/s */ - std::size_t bootstrap_bandwidth_limit{ 16 * 1024 * 1024 }; + /** Default boostrap outbound traffic limit is 5MB/s */ + std::size_t bootstrap_bandwidth_limit{ 5 * 1024 * 1024 }; /** Bootstrap traffic does not need bursts */ double bootstrap_bandwidth_burst_ratio{ 1. }; std::chrono::milliseconds conf_height_processor_batch_min_time{ 50 }; @@ -140,6 +140,7 @@ public: bool disable_bootstrap_bulk_pull_server{ false }; bool disable_bootstrap_bulk_push_client{ false }; bool disable_ongoing_bootstrap{ false }; // For testing only + bool disable_ascending_bootstrap{ false }; bool disable_rep_crawler{ false }; bool disable_request_loop{ false }; // For testing only bool disable_tcp_realtime{ false }; diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 676bb0dd..db828cd5 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -189,6 +189,11 @@ std::unordered_set> nano::transport::t auto index (nano::random_pool::generate_word32 (0, static_cast (peers_size - 1))); auto channel = channels.get ()[index].channel; + if (!channel->alive ()) + { + continue; + } + if (channel->get_network_version () >= min_version && (include_temporary_channels_a || !channel->temporary)) { result.insert (channel); diff --git a/nano/node/unchecked_map.cpp b/nano/node/unchecked_map.cpp index 1a4ea823..d00def25 100644 --- a/nano/node/unchecked_map.cpp +++ b/nano/node/unchecked_map.cpp @@ -22,113 +22,66 @@ nano::unchecked_map::~unchecked_map () void nano::unchecked_map::put (nano::hash_or_account const & dependency, nano::unchecked_info const & info) { - nano::unique_lock lock{ mutex }; - buffer.push_back (std::make_pair (dependency, info)); - lock.unlock (); + nano::lock_guard lock{ entries_mutex }; + nano::unchecked_key key{ dependency, info.block->hash () }; + entries.get ().insert ({ key, info }); + if (entries.size () > mem_block_count_max) + { + entries.get ().pop_front (); + } stats.inc (nano::stat::type::unchecked, nano::stat::detail::put); - condition.notify_all (); // Notify run () } -void nano::unchecked_map::for_each ( -nano::transaction const & transaction, std::function action, std::function predicate) +void nano::unchecked_map::for_each (std::function action, std::function predicate) { nano::lock_guard lock{ entries_mutex }; - if (entries == nullptr) + for (auto i = entries.begin (), n = entries.end (); predicate () && i != n; ++i) { - for (auto [i, n] = store.unchecked.full_range (transaction); predicate () && i != n; ++i) - { - action (i->first, i->second); - } - } - else - { - for (auto i = entries->begin (), n = entries->end (); predicate () && i != n; ++i) - { - action (i->key, i->info); - } + action (i->key, i->info); } } -void nano::unchecked_map::for_each ( -nano::transaction const & transaction, nano::hash_or_account const & dependency, std::function action, std::function predicate) +void nano::unchecked_map::for_each (nano::hash_or_account const & dependency, std::function action, std::function predicate) { nano::lock_guard lock{ entries_mutex }; - if (entries == nullptr) + for (auto i = entries.template get ().lower_bound (nano::unchecked_key{ dependency, 0 }), n = entries.template get ().end (); predicate () && i != n && i->key.key () == dependency.as_block_hash (); ++i) { - for (auto [i, n] = store.unchecked.equal_range (transaction, dependency.as_block_hash ()); predicate () && i->first.key () == dependency.as_block_hash () && i != n; ++i) - { - action (i->first, i->second); - } - } - else - { - for (auto i = entries->template get ().lower_bound (nano::unchecked_key{ dependency, 0 }), n = entries->template get ().end (); predicate () && i != n && i->key.key () == dependency.as_block_hash (); ++i) - { - action (i->key, i->info); - } + action (i->key, i->info); } } -std::vector nano::unchecked_map::get (nano::transaction const & transaction, nano::block_hash const & hash) +std::vector nano::unchecked_map::get (nano::block_hash const & hash) { std::vector result; - for_each (transaction, hash, [&result] (nano::unchecked_key const & key, nano::unchecked_info const & info) { + for_each (hash, [&result] (nano::unchecked_key const & key, nano::unchecked_info const & info) { result.push_back (info); }); return result; } -bool nano::unchecked_map::exists (nano::transaction const & transaction, nano::unchecked_key const & key) const +bool nano::unchecked_map::exists (nano::unchecked_key const & key) const { nano::lock_guard lock{ entries_mutex }; - if (entries == nullptr) - { - return store.unchecked.exists (transaction, key); - } - else - { - return entries->template get ().count (key) != 0; - } + return entries.get ().count (key) != 0; } -void nano::unchecked_map::del (nano::write_transaction const & transaction, nano::unchecked_key const & key) +void nano::unchecked_map::del (nano::unchecked_key const & key) { nano::lock_guard lock{ entries_mutex }; - if (entries == nullptr) - { - store.unchecked.del (transaction, key); - } - else - { - auto erased = entries->template get ().erase (key); - release_assert (erased); - } + auto erased = entries.get ().erase (key); + debug_assert (erased); } -void nano::unchecked_map::clear (nano::write_transaction const & transaction) +void nano::unchecked_map::clear () { nano::lock_guard lock{ entries_mutex }; - if (entries == nullptr) - { - store.unchecked.clear (transaction); - } - else - { - entries->clear (); - } + entries.clear (); } -size_t nano::unchecked_map::count (nano::transaction const & transaction) const +std::size_t nano::unchecked_map::count () const { nano::lock_guard lock{ entries_mutex }; - if (entries == nullptr) - { - return store.unchecked.count (transaction); - } - else - { - return entries->size (); - } + return entries.size (); } void nano::unchecked_map::stop () @@ -152,37 +105,18 @@ void nano::unchecked_map::flush () void nano::unchecked_map::trigger (nano::hash_or_account const & dependency) { nano::unique_lock lock{ mutex }; - buffer.push_back (dependency); - debug_assert (buffer.back ().which () == 1); // which stands for "query". + buffer.emplace_back (dependency); lock.unlock (); stats.inc (nano::stat::type::unchecked, nano::stat::detail::trigger); condition.notify_all (); // Notify run () + stats.inc (nano::stat::type::unchecked, nano::stat::detail::trigger); } -nano::unchecked_map::item_visitor::item_visitor (unchecked_map & unchecked, nano::write_transaction const & transaction) : - unchecked{ unchecked }, - transaction{ transaction } +void nano::unchecked_map::process_queries (decltype (buffer) const & back_buffer) { -} - -void nano::unchecked_map::item_visitor::operator() (insert const & item) -{ - auto const & [dependency, info] = item; - unchecked.insert_impl (transaction, dependency, info); -} - -void nano::unchecked_map::item_visitor::operator() (query const & item) -{ - unchecked.query_impl (transaction, item.hash); -} - -void nano::unchecked_map::write_buffer (decltype (buffer) const & back_buffer) -{ - auto transaction = store.tx_begin_write (); - item_visitor visitor{ *this, transaction }; for (auto const & item : back_buffer) { - boost::apply_visitor (visitor, item); + query_impl (item.hash); } } @@ -197,7 +131,7 @@ void nano::unchecked_map::run () back_buffer.swap (buffer); writing_back_buffer = true; lock.unlock (); - write_buffer (back_buffer); + process_queries (back_buffer); lock.lock (); writing_back_buffer = false; back_buffer.clear (); @@ -212,54 +146,29 @@ void nano::unchecked_map::run () } } -void nano::unchecked_map::insert_impl (nano::write_transaction const & transaction, nano::hash_or_account const & dependency, nano::unchecked_info const & info) +void nano::unchecked_map::query_impl (nano::block_hash const & hash) { - // Check if block dependency has been satisfied while waiting to be placed in the unchecked map - if (store.block.exists (transaction, dependency.as_block_hash ())) - { - satisfied (info); - return; - } - nano::lock_guard lock{ entries_mutex }; - // Check if we should be using memory but the memory container hasn't been constructed i.e. we're transitioning from disk to memory. - if (entries == nullptr && use_memory ()) - { - auto entries_new = std::make_unique (); - for_each ( - transaction, [&entries_new] (nano::unchecked_key const & key, nano::unchecked_info const & info) { entries_new->template get ().insert ({ key, info }); }, [&] () { return entries_new->size () < mem_block_count_max; }); - clear (transaction); - entries = std::move (entries_new); - } - if (entries == nullptr) - { - store.unchecked.put (transaction, dependency, info); - } - else - { - nano::unchecked_key key{ dependency, info.block->hash () }; - entries->template get ().insert ({ key, info }); - while (entries->size () > mem_block_count_max) - { - entries->template get ().pop_front (); - } - } -} - -void nano::unchecked_map::query_impl (nano::write_transaction const & transaction, nano::block_hash const & hash) -{ - nano::lock_guard lock{ entries_mutex }; - std::deque delete_queue; - for_each (transaction, hash, [this, &delete_queue] (nano::unchecked_key const & key, nano::unchecked_info const & info) { + for_each (hash, [this, &delete_queue] (nano::unchecked_key const & key, nano::unchecked_info const & info) { delete_queue.push_back (key); stats.inc (nano::stat::type::unchecked, nano::stat::detail::satisfied); - satisfied (info); + satisfied.notify (info); }); if (!disable_delete) { for (auto const & key : delete_queue) { - del (transaction, key); + del (key); } } } + +std::unique_ptr nano::unchecked_map::collect_container_info (const std::string & name) +{ + nano::lock_guard lock{ mutex }; + + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "entries", entries.size (), sizeof (decltype (entries)::value_type) })); + composite->add_component (std::make_unique (container_info{ "queries", buffer.size (), sizeof (decltype (buffer)::value_type) })); + return composite; +} diff --git a/nano/node/unchecked_map.hpp b/nano/node/unchecked_map.hpp index 3042cdc6..cfcb8b59 100644 --- a/nano/node/unchecked_map.hpp +++ b/nano/node/unchecked_map.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -31,60 +32,50 @@ public: void put (nano::hash_or_account const & dependency, nano::unchecked_info const & info); void for_each ( - nano::transaction const & transaction, std::function action, std::function predicate = [] () { return true; }); + std::function action, std::function predicate = [] () { return true; }); void for_each ( - nano::transaction const & transaction, nano::hash_or_account const & dependency, std::function action, std::function predicate = [] () { return true; }); - std::vector get (nano::transaction const &, nano::block_hash const &); - bool exists (nano::transaction const & transaction, nano::unchecked_key const & key) const; - void del (nano::write_transaction const & transaction, nano::unchecked_key const & key); - void clear (nano::write_transaction const & transaction); - size_t count (nano::transaction const & transaction) const; + nano::hash_or_account const & dependency, std::function action, std::function predicate = [] () { return true; }); + std::vector get (nano::block_hash const &); + bool exists (nano::unchecked_key const & key) const; + void del (nano::unchecked_key const & key); + void clear (); + std::size_t count () const; void stop (); void flush (); - std::function use_memory = [] () { return true; }; - -public: // Trigger requested dependencies + /** + * Trigger requested dependencies + */ void trigger (nano::hash_or_account const & dependency); - std::function satisfied{ [] (nano::unchecked_info const &) {} }; + +public: // Events + nano::observer_set satisfied; private: - using insert = std::pair; - using query = nano::hash_or_account; - class item_visitor : boost::static_visitor<> - { - public: - item_visitor (unchecked_map & unchecked, nano::write_transaction const & transaction); - void operator() (insert const & item); - void operator() (query const & item); - unchecked_map & unchecked; - nano::write_transaction const & transaction; - }; void run (); - void insert_impl (nano::write_transaction const & transaction, nano::hash_or_account const & dependency, nano::unchecked_info const & info); - void query_impl (nano::write_transaction const & transaction, nano::block_hash const & hash); + void query_impl (nano::block_hash const & hash); + +private: // Dependencies nano::store & store; nano::stats & stats; private: bool const & disable_delete; - std::deque> buffer; - std::deque> back_buffer; + std::deque buffer; + std::deque back_buffer; bool writing_back_buffer{ false }; bool stopped{ false }; nano::condition_variable condition; nano::mutex mutex; std::thread thread; - void write_buffer (decltype (buffer) const & back_buffer); - static size_t constexpr mem_block_count_max = 256'000; + void process_queries (decltype (buffer) const & back_buffer); - friend class item_visitor; + static std::size_t constexpr mem_block_count_max = 64 * 1024; -private: // In memory store - class entry +private: + struct entry { - public: nano::unchecked_key key; nano::unchecked_info info; }; @@ -99,8 +90,11 @@ private: // In memory store mi::ordered_unique, mi::member>>>; // clang-format on - std::unique_ptr entries; + ordered_unchecked entries; mutable std::recursive_mutex entries_mutex; + +public: // Container info + std::unique_ptr collect_container_info (std::string const & name); }; } diff --git a/nano/qt/qt.cpp b/nano/qt/qt.cpp index 0f5951b2..457d57c7 100644 --- a/nano/qt/qt.cpp +++ b/nano/qt/qt.cpp @@ -923,7 +923,7 @@ std::string nano_qt::status::text () std::string count_string; { auto size (wallet.wallet_m->wallets.node.ledger.cache.block_count.load ()); - unchecked = wallet.wallet_m->wallets.node.unchecked.count (wallet.wallet_m->wallets.node.store.tx_begin_read ()); + unchecked = wallet.wallet_m->wallets.node.unchecked.count (); cemented = wallet.wallet_m->wallets.node.ledger.cache.cemented_count.load (); count_string = std::to_string (size); } diff --git a/nano/rpc/rpc_handler.cpp b/nano/rpc/rpc_handler.cpp index 69e95d00..7148d45d 100644 --- a/nano/rpc/rpc_handler.cpp +++ b/nano/rpc/rpc_handler.cpp @@ -153,6 +153,7 @@ std::unordered_set create_rpc_control_impls () set.emplace ("account_remove"); set.emplace ("account_representative_set"); set.emplace ("accounts_create"); + set.emplace ("backoff_info"); set.emplace ("block_create"); set.emplace ("bootstrap_lazy"); set.emplace ("confirmation_height_currently_processing"); diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index d247645d..fb11dd69 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -6094,7 +6094,7 @@ TEST (rpc, unchecked) node->process_active (open); node->process_active (open2); // Waits for the last block of the queue to get saved in the database - ASSERT_TIMELY (10s, 2 == node->unchecked.count (node->store.tx_begin_read ())); + ASSERT_TIMELY (10s, 2 == node->unchecked.count ()); boost::property_tree::ptree request; request.put ("action", "unchecked"); request.put ("count", 2); @@ -6135,7 +6135,7 @@ TEST (rpc, unchecked_get) .build_shared (); node->process_active (open); // Waits for the open block to get saved in the database - ASSERT_TIMELY (10s, 1 == node->unchecked.count (node->store.tx_begin_read ())); + ASSERT_TIMELY (10s, 1 == node->unchecked.count ()); boost::property_tree::ptree request{}; request.put ("action", "unchecked_get"); request.put ("hash", open->hash ().to_string ()); @@ -6175,12 +6175,12 @@ TEST (rpc, unchecked_clear) node->process_active (open); boost::property_tree::ptree request{}; // Waits for the open block to get saved in the database - ASSERT_TIMELY (10s, 1 == node->unchecked.count (node->store.tx_begin_read ())); + ASSERT_TIMELY (10s, 1 == node->unchecked.count ()); request.put ("action", "unchecked_clear"); auto response = wait_response (system, rpc_ctx, request); // Waits for the open block to get saved in the database - ASSERT_TIMELY (10s, 0 == node->unchecked.count (node->store.tx_begin_read ())); + ASSERT_TIMELY (10s, 0 == node->unchecked.count ()); } TEST (rpc, unopened) diff --git a/nano/slow_test/CMakeLists.txt b/nano/slow_test/CMakeLists.txt index 564af6e0..c5bbedda 100644 --- a/nano/slow_test/CMakeLists.txt +++ b/nano/slow_test/CMakeLists.txt @@ -1,4 +1,5 @@ -add_executable(slow_test entry.cpp node.cpp vote_cache.cpp vote_processor.cpp) +add_executable(slow_test entry.cpp node.cpp vote_cache.cpp vote_processor.cpp + bootstrap.cpp) target_link_libraries(slow_test secure node test_common gtest libminiupnpc-static) diff --git a/nano/slow_test/bootstrap.cpp b/nano/slow_test/bootstrap.cpp new file mode 100644 index 00000000..63095e8d --- /dev/null +++ b/nano/slow_test/bootstrap.cpp @@ -0,0 +1,200 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include +#include + +using namespace std::chrono_literals; + +namespace +{ +void wait_for_key () +{ + int junk; + std::cin >> junk; +} + +class rpc_wrapper +{ +public: + rpc_wrapper (nano::test::system & system, nano::node & node, uint16_t port) : + node_rpc_config{}, + rpc_config{ node.network_params.network, port, true }, + ipc{ node, node_rpc_config }, + ipc_rpc_processor{ system.io_ctx, rpc_config }, + rpc{ system.io_ctx, rpc_config, ipc_rpc_processor } + { + } + + void start () + { + rpc.start (); + } + +public: + nano::node_rpc_config node_rpc_config; + nano::rpc_config rpc_config; + nano::ipc::ipc_server ipc; + nano::ipc_rpc_processor ipc_rpc_processor; + nano::rpc rpc; +}; + +std::unique_ptr start_rpc (nano::test::system & system, nano::node & node, uint16_t port) +{ + auto rpc = std::make_unique (system, node, port); + rpc->start (); + return rpc; +} +} + +TEST (bootstrap_ascending, profile) +{ + nano::test::system system; + nano::thread_runner runner{ system.io_ctx, 2 }; + nano::networks network = nano::networks::nano_beta_network; + nano::network_params network_params{ network }; + + // Set up client and server nodes + nano::node_config config_server{ network_params }; + config_server.preconfigured_peers.clear (); + config_server.bandwidth_limit = 0; // Unlimited server bandwidth + nano::node_flags flags_server; + flags_server.disable_legacy_bootstrap = true; + flags_server.disable_wallet_bootstrap = true; + flags_server.disable_add_initial_peers = true; + flags_server.disable_ongoing_bootstrap = true; + flags_server.disable_ascending_bootstrap = true; + auto data_path_server = nano::working_path (network); + //auto data_path_server = ""; + auto server = std::make_shared (system.io_ctx, data_path_server, config_server, system.work, flags_server); + system.nodes.push_back (server); + server->start (); + + nano::node_config config_client{ network_params }; + config_client.preconfigured_peers.clear (); + config_client.bandwidth_limit = 0; // Unlimited server bandwidth + nano::node_flags flags_client; + flags_client.disable_legacy_bootstrap = true; + flags_client.disable_wallet_bootstrap = true; + flags_client.disable_add_initial_peers = true; + flags_client.disable_ongoing_bootstrap = true; + config_client.ipc_config.transport_tcp.enabled = true; + // Disable database integrity safety for higher throughput + config_client.lmdb_config.sync = nano::lmdb_config::sync_strategy::nosync_unsafe; + //auto client = system.add_node (config_client, flags_client); + + // macos 16GB RAM disk: diskutil erasevolume HFS+ "RAMDisk" `hdiutil attach -nomount ram://33554432` + //auto data_path_client = "/Volumes/RAMDisk"; + auto data_path_client = nano::unique_path (); + auto client = std::make_shared (system.io_ctx, data_path_client, config_client, system.work, flags_client); + system.nodes.push_back (client); + client->start (); + + // Set up RPC + auto client_rpc = start_rpc (system, *server, 55000); + auto server_rpc = start_rpc (system, *client, 55001); + + struct entry + { + nano::bootstrap_ascending::async_tag tag; + std::shared_ptr request_channel; + std::shared_ptr reply_channel; + + bool replied{ false }; + bool received{ false }; + }; + + nano::mutex mutex; + std::unordered_map requests; + + server->bootstrap_server.on_response.add ([&] (auto & response, auto & channel) { + nano::lock_guard lock{ mutex }; + + if (requests.count (response.id)) + { + requests[response.id].replied = true; + requests[response.id].reply_channel = channel; + } + else + { + std::cerr << "unknown response: " << response.id << std::endl; + } + }); + + client->ascendboot.on_request.add ([&] (auto & tag, auto & channel) { + nano::lock_guard lock{ mutex }; + + requests[tag.id] = { tag, channel }; + }); + + client->ascendboot.on_reply.add ([&] (auto & tag) { + nano::lock_guard lock{ mutex }; + + requests[tag.id].received = true; + }); + + /*client->ascendboot.on_timeout.add ([&] (auto & tag) { + nano::lock_guard lock{ mutex }; + + if (requests.count (tag.id)) + { + auto entry = requests[tag.id]; + + std::cerr << "timeout: " + << "replied: " << entry.replied + << " | " + << "recevied: " << entry.received + << " | " + << "request: " << entry.request_channel->to_string () + << " ||| " + << "reply: " << (entry.reply_channel ? entry.reply_channel->to_string () : "null") + << std::endl; + } + else + { + std::cerr << "unknown timeout: " << tag.id << std::endl; + } + });*/ + + std::cout << "server count: " << server->ledger.cache.block_count << std::endl; + + nano::test::rate_observer rate; + rate.observe ("count", [&] () { return client->ledger.cache.block_count.load (); }); + rate.observe ("unchecked", [&] () { return client->unchecked.count (); }); + rate.observe ("block_processor", [&] () { return client->block_processor.size (); }); + rate.observe ("priority", [&] () { return client->ascendboot.priority_size (); }); + rate.observe ("blocking", [&] () { return client->ascendboot.blocked_size (); }); + rate.observe (*client, nano::stat::type::bootstrap_ascending, nano::stat::detail::request, nano::stat::dir::out); + rate.observe (*client, nano::stat::type::bootstrap_ascending, nano::stat::detail::reply, nano::stat::dir::in); + rate.observe (*client, nano::stat::type::bootstrap_ascending, nano::stat::detail::blocks, nano::stat::dir::in); + rate.observe (*server, nano::stat::type::bootstrap_server, nano::stat::detail::blocks, nano::stat::dir::out); + rate.observe (*client, nano::stat::type::ledger, nano::stat::detail::old, nano::stat::dir::in); + rate.observe (*client, nano::stat::type::ledger, nano::stat::detail::gap_epoch_open_pending, nano::stat::dir::in); + rate.observe (*client, nano::stat::type::ledger, nano::stat::detail::gap_source, nano::stat::dir::in); + rate.observe (*client, nano::stat::type::ledger, nano::stat::detail::gap_previous, nano::stat::dir::in); + rate.background_print (3s); + + //wait_for_key (); + while (true) + { + nano::test::establish_tcp (system, *client, server->network.endpoint ()); + std::this_thread::sleep_for (10s); + } + + server->stop (); + client->stop (); +} diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index f6fedfab..9200416a 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -510,7 +510,7 @@ TEST (store, unchecked_load) node.unchecked.put (i, block); } // Waits for all the blocks to get saved in the database - ASSERT_TIMELY (8000s, num_unchecked == node.unchecked.count (node.store.tx_begin_read ())); + ASSERT_TIMELY (8000s, num_unchecked == node.unchecked.count ()); } TEST (store, vote_load)