Merge pull request #4158 from clemahieu/bootstrap_ascending_client
Merge the ascending bootstrap feature branch.
This commit is contained in:
commit
022a10f24f
37 changed files with 1930 additions and 266 deletions
|
@ -9,6 +9,7 @@ add_executable(
|
|||
block_store.cpp
|
||||
blockprocessor.cpp
|
||||
bootstrap.cpp
|
||||
bootstrap_ascending.cpp
|
||||
bootstrap_server.cpp
|
||||
cli.cpp
|
||||
confirmation_height.cpp
|
||||
|
|
|
@ -450,9 +450,8 @@ TEST (block_store, empty_bootstrap)
|
|||
auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants);
|
||||
nano::unchecked_map unchecked{ *store, system.stats, false };
|
||||
ASSERT_TRUE (!store->init_error ());
|
||||
auto transaction (store->tx_begin_read ());
|
||||
size_t count = 0;
|
||||
unchecked.for_each (transaction, [&count] (nano::unchecked_key const & key, nano::unchecked_info const & info) {
|
||||
unchecked.for_each ([&count] (nano::unchecked_key const & key, nano::unchecked_info const & info) {
|
||||
++count;
|
||||
});
|
||||
ASSERT_EQ (count, 0);
|
||||
|
|
|
@ -127,12 +127,12 @@ TEST (bulk_pull, ascending_one_hash)
|
|||
auto connection = std::make_shared<nano::transport::tcp_server> (socket, system.nodes[0]);
|
||||
auto req = std::make_unique<nano::bulk_pull> (nano::dev::network_params.network);
|
||||
req->start = nano::dev::genesis->hash ();
|
||||
req->end = nano::dev::genesis->hash ();
|
||||
req->end.clear ();
|
||||
req->header.flag_set (nano::message_header::bulk_pull_ascending_flag);
|
||||
auto request = std::make_shared<nano::bulk_pull_server> (connection, std::move (req));
|
||||
auto block_out1 = request->get_next ();
|
||||
ASSERT_NE (nullptr, block_out1);
|
||||
ASSERT_EQ (block_out1->hash (), nano::dev::genesis->hash ());
|
||||
ASSERT_EQ (block_out1->hash (), block1->hash ());
|
||||
ASSERT_EQ (nullptr, request->get_next ());
|
||||
}
|
||||
|
||||
|
@ -158,7 +158,7 @@ TEST (bulk_pull, ascending_two_account)
|
|||
auto socket = std::make_shared<nano::transport::socket> (node, nano::transport::socket::endpoint_type_t::server);
|
||||
auto connection = std::make_shared<nano::transport::tcp_server> (socket, system.nodes[0]);
|
||||
auto req = std::make_unique<nano::bulk_pull> (nano::dev::network_params.network);
|
||||
req->start = nano::dev::genesis->hash ();
|
||||
req->start = nano::dev::genesis->account ();
|
||||
req->end.clear ();
|
||||
req->header.flag_set (nano::message_header::bulk_pull_ascending_flag);
|
||||
auto request = std::make_shared<nano::bulk_pull_server> (connection, std::move (req));
|
||||
|
@ -172,7 +172,7 @@ TEST (bulk_pull, ascending_two_account)
|
|||
}
|
||||
|
||||
/**
|
||||
Tests that the `end' value is respected in the bulk_pull message
|
||||
Tests that the `end' value is respected in the bulk_pull message when the ascending flag is used.
|
||||
*/
|
||||
TEST (bulk_pull, ascending_end)
|
||||
{
|
||||
|
@ -1351,7 +1351,7 @@ TEST (bootstrap_processor, lazy_pruning_missing_block)
|
|||
ASSERT_FALSE (node2->ledger.block_or_pruned_exists (state_open->hash ()));
|
||||
{
|
||||
auto transaction (node2->store.tx_begin_read ());
|
||||
ASSERT_TRUE (node2->unchecked.exists (transaction, nano::unchecked_key (send2->root ().as_block_hash (), send2->hash ())));
|
||||
ASSERT_TRUE (node2->unchecked.exists (nano::unchecked_key (send2->root ().as_block_hash (), send2->hash ())));
|
||||
}
|
||||
// Insert missing block
|
||||
node2->process_active (send1);
|
||||
|
@ -2045,7 +2045,7 @@ TEST (bulk, DISABLED_genesis_pruning)
|
|||
ASSERT_EQ (1, node2->ledger.cache.block_count);
|
||||
{
|
||||
auto transaction (node2->store.tx_begin_write ());
|
||||
node2->unchecked.clear (transaction);
|
||||
node2->unchecked.clear ();
|
||||
}
|
||||
// Insert pruned blocks
|
||||
node2->process_active (send1);
|
||||
|
|
252
nano/core_test/bootstrap_ascending.cpp
Normal file
252
nano/core_test/bootstrap_ascending.cpp
Normal file
|
@ -0,0 +1,252 @@
|
|||
#include <nano/lib/stats.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_ascending.hpp>
|
||||
#include <nano/test_common/system.hpp>
|
||||
#include <nano/test_common/testutil.hpp>
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
namespace
|
||||
{
|
||||
nano::block_hash random_hash ()
|
||||
{
|
||||
nano::block_hash random_hash;
|
||||
nano::random_pool::generate_block (random_hash.bytes.data (), random_hash.bytes.size ());
|
||||
return random_hash;
|
||||
}
|
||||
}
|
||||
|
||||
TEST (account_sets, construction)
|
||||
{
|
||||
nano::stats stats;
|
||||
nano::logger_mt logger;
|
||||
auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants);
|
||||
ASSERT_FALSE (store->init_error ());
|
||||
nano::bootstrap_ascending::account_sets sets{ stats };
|
||||
}
|
||||
|
||||
TEST (account_sets, empty_blocked)
|
||||
{
|
||||
nano::account account{ 1 };
|
||||
nano::stats stats;
|
||||
nano::logger_mt logger;
|
||||
auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants);
|
||||
ASSERT_FALSE (store->init_error ());
|
||||
nano::bootstrap_ascending::account_sets sets{ stats };
|
||||
ASSERT_FALSE (sets.blocked (account));
|
||||
}
|
||||
|
||||
TEST (account_sets, block)
|
||||
{
|
||||
nano::account account{ 1 };
|
||||
nano::stats stats;
|
||||
nano::logger_mt logger;
|
||||
auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants);
|
||||
ASSERT_FALSE (store->init_error ());
|
||||
nano::bootstrap_ascending::account_sets sets{ stats };
|
||||
sets.block (account, random_hash ());
|
||||
ASSERT_TRUE (sets.blocked (account));
|
||||
}
|
||||
|
||||
TEST (account_sets, unblock)
|
||||
{
|
||||
nano::account account{ 1 };
|
||||
nano::stats stats;
|
||||
nano::logger_mt logger;
|
||||
auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants);
|
||||
ASSERT_FALSE (store->init_error ());
|
||||
nano::bootstrap_ascending::account_sets sets{ stats };
|
||||
auto hash = random_hash ();
|
||||
sets.block (account, hash);
|
||||
sets.unblock (account, hash);
|
||||
ASSERT_FALSE (sets.blocked (account));
|
||||
}
|
||||
|
||||
TEST (account_sets, priority_base)
|
||||
{
|
||||
nano::account account{ 1 };
|
||||
nano::stats stats;
|
||||
nano::logger_mt logger;
|
||||
auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants);
|
||||
ASSERT_FALSE (store->init_error ());
|
||||
nano::bootstrap_ascending::account_sets sets{ stats };
|
||||
ASSERT_EQ (1.0f, sets.priority (account));
|
||||
}
|
||||
|
||||
TEST (account_sets, priority_blocked)
|
||||
{
|
||||
nano::account account{ 1 };
|
||||
nano::stats stats;
|
||||
nano::logger_mt logger;
|
||||
auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants);
|
||||
ASSERT_FALSE (store->init_error ());
|
||||
nano::bootstrap_ascending::account_sets sets{ stats };
|
||||
sets.block (account, random_hash ());
|
||||
ASSERT_EQ (0.0f, sets.priority (account));
|
||||
}
|
||||
|
||||
// When account is unblocked, check that it retains it former priority
|
||||
TEST (account_sets, priority_unblock_keep)
|
||||
{
|
||||
nano::account account{ 1 };
|
||||
nano::stats stats;
|
||||
nano::logger_mt logger;
|
||||
auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants);
|
||||
ASSERT_FALSE (store->init_error ());
|
||||
nano::bootstrap_ascending::account_sets sets{ stats };
|
||||
sets.priority_up (account);
|
||||
sets.priority_up (account);
|
||||
ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial * nano::bootstrap_ascending::account_sets::priority_increase);
|
||||
auto hash = random_hash ();
|
||||
sets.block (account, hash);
|
||||
ASSERT_EQ (0.0f, sets.priority (account));
|
||||
sets.unblock (account, hash);
|
||||
ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial * nano::bootstrap_ascending::account_sets::priority_increase);
|
||||
}
|
||||
|
||||
TEST (account_sets, priority_up_down)
|
||||
{
|
||||
nano::account account{ 1 };
|
||||
nano::stats stats;
|
||||
nano::logger_mt logger;
|
||||
auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants);
|
||||
ASSERT_FALSE (store->init_error ());
|
||||
nano::bootstrap_ascending::account_sets sets{ stats };
|
||||
sets.priority_up (account);
|
||||
ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial);
|
||||
sets.priority_down (account);
|
||||
ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial - nano::bootstrap_ascending::account_sets::priority_decrease);
|
||||
}
|
||||
|
||||
// Check that priority downward saturates to 1.0f
|
||||
TEST (account_sets, priority_down_sat)
|
||||
{
|
||||
nano::account account{ 1 };
|
||||
nano::stats stats;
|
||||
nano::logger_mt logger;
|
||||
auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants);
|
||||
ASSERT_FALSE (store->init_error ());
|
||||
nano::bootstrap_ascending::account_sets sets{ stats };
|
||||
sets.priority_down (account);
|
||||
ASSERT_EQ (1.0f, sets.priority (account));
|
||||
}
|
||||
|
||||
// Ensure priority value is bounded
|
||||
TEST (account_sets, saturate_priority)
|
||||
{
|
||||
nano::account account{ 1 };
|
||||
nano::stats stats;
|
||||
nano::logger_mt logger;
|
||||
auto store = nano::make_store (logger, nano::unique_path (), nano::dev::constants);
|
||||
ASSERT_FALSE (store->init_error ());
|
||||
nano::bootstrap_ascending::account_sets sets{ stats };
|
||||
for (int n = 0; n < 1000; ++n)
|
||||
{
|
||||
sets.priority_up (account);
|
||||
}
|
||||
ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_max);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the base case for returning
|
||||
*/
|
||||
TEST (bootstrap_ascending, account_base)
|
||||
{
|
||||
nano::node_flags flags;
|
||||
nano::test::system system{ 1, nano::transport::transport_type::tcp, flags };
|
||||
auto & node0 = *system.nodes[0];
|
||||
nano::state_block_builder builder;
|
||||
auto send1 = builder.make_block ()
|
||||
.account (nano::dev::genesis_key.pub)
|
||||
.previous (nano::dev::genesis->hash ())
|
||||
.representative (nano::dev::genesis_key.pub)
|
||||
.link (0)
|
||||
.balance (nano::dev::constants.genesis_amount - 1)
|
||||
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
|
||||
.work (*system.work.generate (nano::dev::genesis->hash ()))
|
||||
.build_shared ();
|
||||
ASSERT_EQ (nano::process_result::progress, node0.process (*send1).code);
|
||||
auto & node1 = *system.add_node (flags);
|
||||
ASSERT_TIMELY (5s, node1.block (send1->hash ()) != nullptr);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that bootstrap_ascending will return multiple new blocks in-order
|
||||
*/
|
||||
TEST (bootstrap_ascending, account_inductive)
|
||||
{
|
||||
nano::node_flags flags;
|
||||
nano::test::system system{ 1, nano::transport::transport_type::tcp, flags };
|
||||
auto & node0 = *system.nodes[0];
|
||||
nano::state_block_builder builder;
|
||||
auto send1 = builder.make_block ()
|
||||
.account (nano::dev::genesis_key.pub)
|
||||
.previous (nano::dev::genesis->hash ())
|
||||
.representative (nano::dev::genesis_key.pub)
|
||||
.link (0)
|
||||
.balance (nano::dev::constants.genesis_amount - 1)
|
||||
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
|
||||
.work (*system.work.generate (nano::dev::genesis->hash ()))
|
||||
.build_shared ();
|
||||
auto send2 = builder.make_block ()
|
||||
.account (nano::dev::genesis_key.pub)
|
||||
.previous (send1->hash ())
|
||||
.representative (nano::dev::genesis_key.pub)
|
||||
.link (0)
|
||||
.balance (nano::dev::constants.genesis_amount - 2)
|
||||
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
|
||||
.work (*system.work.generate (send1->hash ()))
|
||||
.build_shared ();
|
||||
// std::cerr << "Genesis: " << nano::dev::genesis->hash ().to_string () << std::endl;
|
||||
// std::cerr << "Send1: " << send1->hash ().to_string () << std::endl;
|
||||
// std::cerr << "Send2: " << send2->hash ().to_string () << std::endl;
|
||||
ASSERT_EQ (nano::process_result::progress, node0.process (*send1).code);
|
||||
ASSERT_EQ (nano::process_result::progress, node0.process (*send2).code);
|
||||
auto & node1 = *system.add_node (flags);
|
||||
ASSERT_TIMELY (50s, node1.block (send2->hash ()) != nullptr);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that bootstrap_ascending will return multiple new blocks in-order
|
||||
*/
|
||||
TEST (bootstrap_ascending, trace_base)
|
||||
{
|
||||
nano::node_flags flags;
|
||||
flags.disable_legacy_bootstrap = true;
|
||||
nano::test::system system{ 1, nano::transport::transport_type::tcp, flags };
|
||||
auto & node0 = *system.nodes[0];
|
||||
nano::keypair key;
|
||||
nano::state_block_builder builder;
|
||||
auto send1 = builder.make_block ()
|
||||
.account (nano::dev::genesis_key.pub)
|
||||
.previous (nano::dev::genesis->hash ())
|
||||
.representative (nano::dev::genesis_key.pub)
|
||||
.link (key.pub)
|
||||
.balance (nano::dev::constants.genesis_amount - 1)
|
||||
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
|
||||
.work (*system.work.generate (nano::dev::genesis->hash ()))
|
||||
.build_shared ();
|
||||
auto receive1 = builder.make_block ()
|
||||
.account (key.pub)
|
||||
.previous (0)
|
||||
.representative (nano::dev::genesis_key.pub)
|
||||
.link (send1->hash ())
|
||||
.balance (1)
|
||||
.sign (key.prv, key.pub)
|
||||
.work (*system.work.generate (key.pub))
|
||||
.build_shared ();
|
||||
// std::cerr << "Genesis key: " << nano::dev::genesis_key.pub.to_account () << std::endl;
|
||||
// std::cerr << "Key: " << key.pub.to_account () << std::endl;
|
||||
// std::cerr << "Genesis: " << nano::dev::genesis->hash ().to_string () << std::endl;
|
||||
// std::cerr << "send1: " << send1->hash ().to_string () << std::endl;
|
||||
// std::cerr << "receive1: " << receive1->hash ().to_string () << std::endl;
|
||||
auto & node1 = *system.add_node ();
|
||||
// std::cerr << "--------------- Start ---------------\n";
|
||||
ASSERT_EQ (nano::process_result::progress, node0.process (*send1).code);
|
||||
ASSERT_EQ (nano::process_result::progress, node0.process (*receive1).code);
|
||||
ASSERT_EQ (node1.store.pending.begin (node1.store.tx_begin_read (), nano::pending_key{ key.pub, 0 }), node1.store.pending.end ());
|
||||
// std::cerr << "node0: " << node0.network.endpoint () << std::endl;
|
||||
// std::cerr << "node1: " << node1.network.endpoint () << std::endl;
|
||||
ASSERT_TIMELY (10s, node1.block (receive1->hash ()) != nullptr);
|
||||
}
|
|
@ -391,14 +391,14 @@ TEST (confirmation_height, gap_bootstrap)
|
|||
node1.process_active (receive2);
|
||||
// Waits for the unchecked_map to process the 4 blocks added to the block_processor, saving them in the unchecked table
|
||||
auto check_block_is_listed = [&] (nano::transaction const & transaction_a, nano::block_hash const & block_hash_a) {
|
||||
return !node1.unchecked.get (transaction_a, block_hash_a).empty ();
|
||||
return !node1.unchecked.get (block_hash_a).empty ();
|
||||
};
|
||||
ASSERT_TIMELY (5s, check_block_is_listed (node1.store.tx_begin_read (), receive2->previous ()));
|
||||
|
||||
// Confirmation heights should not be updated
|
||||
{
|
||||
auto transaction (node1.store.tx_begin_read ());
|
||||
auto unchecked_count (node1.unchecked.count (transaction));
|
||||
auto unchecked_count (node1.unchecked.count ());
|
||||
ASSERT_EQ (unchecked_count, 2);
|
||||
|
||||
nano::confirmation_height_info confirmation_height_info;
|
||||
|
@ -410,7 +410,7 @@ TEST (confirmation_height, gap_bootstrap)
|
|||
// Now complete the chain where the block comes in on the bootstrap network.
|
||||
node1.block_processor.add (open1);
|
||||
|
||||
ASSERT_TIMELY (5s, node1.unchecked.count (node1.store.tx_begin_read ()) == 0);
|
||||
ASSERT_TIMELY (5s, node1.unchecked.count () == 0);
|
||||
// Confirmation height should be unchanged and unchecked should now be 0
|
||||
{
|
||||
auto transaction = node1.store.tx_begin_read ();
|
||||
|
@ -544,7 +544,7 @@ TEST (confirmation_height, gap_live)
|
|||
|
||||
// This should confirm the open block and the source of the receive blocks
|
||||
auto transaction = node->store.tx_begin_read ();
|
||||
auto unchecked_count = node->unchecked.count (transaction);
|
||||
auto unchecked_count = node->unchecked.count ();
|
||||
ASSERT_EQ (unchecked_count, 0);
|
||||
|
||||
nano::confirmation_height_info confirmation_height_info{};
|
||||
|
|
|
@ -3999,10 +3999,10 @@ TEST (ledger, epoch_open_pending)
|
|||
ASSERT_EQ (nano::process_result::gap_epoch_open_pending, process_result.code);
|
||||
node1.block_processor.add (epoch_open);
|
||||
// Waits for the block to get saved in the database
|
||||
ASSERT_TIMELY (10s, 1 == node1.unchecked.count (node1.store.tx_begin_read ()));
|
||||
ASSERT_TIMELY (10s, 1 == node1.unchecked.count ());
|
||||
ASSERT_FALSE (node1.ledger.block_or_pruned_exists (epoch_open->hash ()));
|
||||
// Open block should be inserted into unchecked
|
||||
auto blocks = node1.unchecked.get (node1.store.tx_begin_read (), nano::hash_or_account (epoch_open->account ()).hash);
|
||||
auto blocks = node1.unchecked.get (nano::hash_or_account (epoch_open->account ()).hash);
|
||||
ASSERT_EQ (blocks.size (), 1);
|
||||
ASSERT_EQ (blocks[0].block->full_hash (), epoch_open->full_hash ());
|
||||
// New block to process epoch open
|
||||
|
@ -4293,8 +4293,8 @@ TEST (ledger, unchecked_epoch)
|
|||
node1.block_processor.add (epoch1);
|
||||
{
|
||||
// Waits for the epoch1 block to pass through block_processor and unchecked.put queues
|
||||
ASSERT_TIMELY (10s, 1 == node1.unchecked.count (node1.store.tx_begin_read ()));
|
||||
auto blocks = node1.unchecked.get (node1.store.tx_begin_read (), epoch1->previous ());
|
||||
ASSERT_TIMELY (10s, 1 == node1.unchecked.count ());
|
||||
auto blocks = node1.unchecked.get (epoch1->previous ());
|
||||
ASSERT_EQ (blocks.size (), 1);
|
||||
}
|
||||
node1.block_processor.add (send1);
|
||||
|
@ -4302,7 +4302,7 @@ TEST (ledger, unchecked_epoch)
|
|||
ASSERT_TIMELY (5s, node1.store.block.exists (node1.store.tx_begin_read (), epoch1->hash ()));
|
||||
{
|
||||
// Waits for the last blocks to pass through block_processor and unchecked.put queues
|
||||
ASSERT_TIMELY (10s, 0 == node1.unchecked.count (node1.store.tx_begin_read ()));
|
||||
ASSERT_TIMELY (10s, 0 == node1.unchecked.count ());
|
||||
auto info = node1.ledger.account_info (node1.store.tx_begin_read (), destination.pub);
|
||||
ASSERT_TRUE (info);
|
||||
ASSERT_EQ (info->epoch (), nano::epoch::epoch_1);
|
||||
|
@ -4367,8 +4367,8 @@ TEST (ledger, unchecked_epoch_invalid)
|
|||
node1.block_processor.add (epoch2);
|
||||
{
|
||||
// Waits for the last blocks to pass through block_processor and unchecked.put queues
|
||||
ASSERT_TIMELY (10s, 2 == node1.unchecked.count (node1.store.tx_begin_read ()));
|
||||
auto blocks = node1.unchecked.get (node1.store.tx_begin_read (), epoch1->previous ());
|
||||
ASSERT_TIMELY (10s, 2 == node1.unchecked.count ());
|
||||
auto blocks = node1.unchecked.get (epoch1->previous ());
|
||||
ASSERT_EQ (blocks.size (), 2);
|
||||
}
|
||||
node1.block_processor.add (send1);
|
||||
|
@ -4378,9 +4378,9 @@ TEST (ledger, unchecked_epoch_invalid)
|
|||
{
|
||||
auto transaction = node1.store.tx_begin_read ();
|
||||
ASSERT_FALSE (node1.store.block.exists (transaction, epoch1->hash ()));
|
||||
auto unchecked_count = node1.unchecked.count (transaction);
|
||||
auto unchecked_count = node1.unchecked.count ();
|
||||
ASSERT_EQ (unchecked_count, 0);
|
||||
ASSERT_EQ (unchecked_count, node1.unchecked.count (transaction));
|
||||
ASSERT_EQ (unchecked_count, node1.unchecked.count ());
|
||||
auto info = node1.ledger.account_info (transaction, destination.pub);
|
||||
ASSERT_TRUE (info);
|
||||
ASSERT_NE (info->epoch (), nano::epoch::epoch_1);
|
||||
|
@ -4434,15 +4434,15 @@ TEST (ledger, unchecked_open)
|
|||
node1.block_processor.add (open1);
|
||||
{
|
||||
// Waits for the last blocks to pass through block_processor and unchecked.put queues
|
||||
ASSERT_TIMELY (5s, 1 == node1.unchecked.count (node1.store.tx_begin_read ()));
|
||||
// When open1 existists in unchecked, we know open2 has been processed.
|
||||
auto blocks = node1.unchecked.get (node1.store.tx_begin_read (), open1->source ());
|
||||
ASSERT_TIMELY (10s, 1 == node1.unchecked.count ());
|
||||
// Get the next peer for attempting a tcp bootstrap connection
|
||||
auto blocks = node1.unchecked.get (open1->source ());
|
||||
ASSERT_EQ (blocks.size (), 1);
|
||||
}
|
||||
node1.block_processor.add (send1);
|
||||
// Waits for the send1 block to pass through block_processor and unchecked.put queues
|
||||
ASSERT_TIMELY (5s, node1.store.block.exists (node1.store.tx_begin_read (), open1->hash ()));
|
||||
ASSERT_EQ (0, node1.unchecked.count (node1.store.tx_begin_read ()));
|
||||
ASSERT_EQ (0, node1.unchecked.count ());
|
||||
}
|
||||
|
||||
TEST (ledger, unchecked_receive)
|
||||
|
@ -4493,13 +4493,13 @@ TEST (ledger, unchecked_receive)
|
|||
node1.block_processor.add (send1);
|
||||
node1.block_processor.add (receive1);
|
||||
auto check_block_is_listed = [&] (nano::transaction const & transaction_a, nano::block_hash const & block_hash_a) {
|
||||
return !node1.unchecked.get (transaction_a, block_hash_a).empty ();
|
||||
return !node1.unchecked.get (block_hash_a).empty ();
|
||||
};
|
||||
// Previous block for receive1 is unknown, signature cannot be validated
|
||||
{
|
||||
// Waits for the last blocks to pass through block_processor and unchecked.put queues
|
||||
ASSERT_TIMELY (15s, check_block_is_listed (node1.store.tx_begin_read (), receive1->previous ()));
|
||||
auto blocks = node1.unchecked.get (node1.store.tx_begin_read (), receive1->previous ());
|
||||
auto blocks = node1.unchecked.get (receive1->previous ());
|
||||
ASSERT_EQ (blocks.size (), 1);
|
||||
}
|
||||
// Waits for the open1 block to pass through block_processor and unchecked.put queues
|
||||
|
@ -4508,12 +4508,12 @@ TEST (ledger, unchecked_receive)
|
|||
// Previous block for receive1 is known, signature was validated
|
||||
{
|
||||
auto transaction = node1.store.tx_begin_read ();
|
||||
auto blocks (node1.unchecked.get (transaction, receive1->source ()));
|
||||
auto blocks (node1.unchecked.get (receive1->source ()));
|
||||
ASSERT_EQ (blocks.size (), 1);
|
||||
}
|
||||
node1.block_processor.add (send2);
|
||||
ASSERT_TIMELY (10s, node1.store.block.exists (node1.store.tx_begin_read (), receive1->hash ()));
|
||||
ASSERT_EQ (0, node1.unchecked.count (node1.store.tx_begin_read ()));
|
||||
ASSERT_EQ (0, node1.unchecked.count ());
|
||||
}
|
||||
|
||||
TEST (ledger, confirmation_height_not_updated)
|
||||
|
@ -5542,8 +5542,7 @@ TEST (ledger, migrate_lmdb_to_rocksdb)
|
|||
uint16_t port = 100;
|
||||
nano::lmdb::store store{ logger, path / "data.ldb", nano::dev::constants };
|
||||
nano::unchecked_map unchecked{ store, system.stats, false };
|
||||
nano::stats stats{};
|
||||
nano::ledger ledger{ store, stats, nano::dev::constants };
|
||||
nano::ledger ledger{ store, system.stats, nano::dev::constants };
|
||||
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
|
||||
|
||||
std::shared_ptr<nano::block> send = nano::state_block_builder ()
|
||||
|
|
|
@ -2984,7 +2984,7 @@ TEST (node, block_processor_signatures)
|
|||
node1.process_active (receive2);
|
||||
node1.process_active (receive3);
|
||||
ASSERT_TIMELY (5s, node1.block (receive2->hash ()) != nullptr); // Implies send1, send2, send3, receive1.
|
||||
ASSERT_TIMELY (5s, node1.unchecked.count (node1.store.tx_begin_read ()) == 0);
|
||||
ASSERT_TIMELY (5s, node1.unchecked.count () == 0);
|
||||
ASSERT_EQ (nullptr, node1.block (receive3->hash ())); // Invalid signer
|
||||
ASSERT_EQ (nullptr, node1.block (send4->hash ())); // Invalid signature via process_active
|
||||
ASSERT_EQ (nullptr, node1.block (send5->hash ())); // Invalid signature via unchecked
|
||||
|
@ -3289,17 +3289,17 @@ TEST (node, unchecked_cleanup)
|
|||
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
|
||||
node.process_active (open);
|
||||
// Waits for the open block to get saved in the database
|
||||
ASSERT_TIMELY (15s, 1 == node.unchecked.count (node.store.tx_begin_read ()));
|
||||
ASSERT_TIMELY (15s, 1 == node.unchecked.count ());
|
||||
node.config.unchecked_cutoff_time = std::chrono::seconds (2);
|
||||
ASSERT_EQ (1, node.unchecked.count (node.store.tx_begin_read ()));
|
||||
ASSERT_EQ (1, node.unchecked.count ());
|
||||
std::this_thread::sleep_for (std::chrono::seconds (1));
|
||||
node.unchecked_cleanup ();
|
||||
ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
|
||||
ASSERT_EQ (1, node.unchecked.count (node.store.tx_begin_read ()));
|
||||
ASSERT_EQ (1, node.unchecked.count ());
|
||||
std::this_thread::sleep_for (std::chrono::seconds (2));
|
||||
node.unchecked_cleanup ();
|
||||
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
|
||||
ASSERT_EQ (0, node.unchecked.count (node.store.tx_begin_read ()));
|
||||
ASSERT_EQ (0, node.unchecked.count ());
|
||||
}
|
||||
|
||||
/** This checks that a node can be opened (without being blocked) when a write lock is held elsewhere */
|
||||
|
|
|
@ -73,18 +73,18 @@ TEST (block_store, one_bootstrap)
|
|||
.build_shared ();
|
||||
unchecked.put (block1->hash (), nano::unchecked_info{ block1 });
|
||||
auto check_block_is_listed = [&] (nano::transaction const & transaction_a, nano::block_hash const & block_hash_a) {
|
||||
return unchecked.get (transaction_a, block_hash_a).size () > 0;
|
||||
return unchecked.get (block_hash_a).size () > 0;
|
||||
};
|
||||
// Waits for the block1 to get saved in the database
|
||||
ASSERT_TIMELY (10s, check_block_is_listed (store->tx_begin_read (), block1->hash ()));
|
||||
auto transaction = store->tx_begin_read ();
|
||||
std::vector<nano::block_hash> dependencies;
|
||||
unchecked.for_each (transaction, [&dependencies] (nano::unchecked_key const & key, nano::unchecked_info const & info) {
|
||||
unchecked.for_each ([&dependencies] (nano::unchecked_key const & key, nano::unchecked_info const & info) {
|
||||
dependencies.push_back (key.key ());
|
||||
});
|
||||
auto hash1 = dependencies[0];
|
||||
ASSERT_EQ (block1->hash (), hash1);
|
||||
auto blocks = unchecked.get (transaction, hash1);
|
||||
auto blocks = unchecked.get (hash1);
|
||||
ASSERT_EQ (1, blocks.size ());
|
||||
auto block2 = blocks[0].block;
|
||||
ASSERT_EQ (*block1, *block2);
|
||||
|
@ -109,25 +109,24 @@ TEST (unchecked, simple)
|
|||
.work (5)
|
||||
.build_shared ();
|
||||
// Asserts the block wasn't added yet to the unchecked table
|
||||
auto block_listing1 = unchecked.get (store->tx_begin_read (), block->previous ());
|
||||
auto block_listing1 = unchecked.get (block->previous ());
|
||||
ASSERT_TRUE (block_listing1.empty ());
|
||||
// Enqueues a block to be saved on the unchecked table
|
||||
unchecked.put (block->previous (), nano::unchecked_info (block));
|
||||
// Waits for the block to get written in the database
|
||||
auto check_block_is_listed = [&] (nano::transaction const & transaction_a, nano::block_hash const & block_hash_a) {
|
||||
return unchecked.get (transaction_a, block_hash_a).size () > 0;
|
||||
auto check_block_is_listed = [&] (nano::block_hash const & block_hash_a) {
|
||||
return unchecked.get (block_hash_a).size () > 0;
|
||||
};
|
||||
ASSERT_TIMELY (5s, check_block_is_listed (store->tx_begin_read (), block->previous ()));
|
||||
auto transaction = store->tx_begin_write ();
|
||||
ASSERT_TIMELY (5s, check_block_is_listed (block->previous ()));
|
||||
// Retrieves the block from the database
|
||||
auto block_listing2 = unchecked.get (transaction, block->previous ());
|
||||
auto block_listing2 = unchecked.get (block->previous ());
|
||||
ASSERT_FALSE (block_listing2.empty ());
|
||||
// Asserts the added block is equal to the retrieved one
|
||||
ASSERT_EQ (*block, *(block_listing2[0].block));
|
||||
// Deletes the block from the database
|
||||
unchecked.del (transaction, nano::unchecked_key (block->previous (), block->hash ()));
|
||||
unchecked.del (nano::unchecked_key (block->previous (), block->hash ()));
|
||||
// Asserts the block is deleted
|
||||
auto block_listing3 = unchecked.get (transaction, block->previous ());
|
||||
auto block_listing3 = unchecked.get (block->previous ());
|
||||
ASSERT_TRUE (block_listing3.empty ());
|
||||
}
|
||||
|
||||
|
@ -154,19 +153,19 @@ TEST (unchecked, multiple)
|
|||
.work (5)
|
||||
.build_shared ();
|
||||
// Asserts the block wasn't added yet to the unchecked table
|
||||
auto block_listing1 = unchecked.get (store->tx_begin_read (), block->previous ());
|
||||
auto block_listing1 = unchecked.get (block->previous ());
|
||||
ASSERT_TRUE (block_listing1.empty ());
|
||||
// Enqueues the first block
|
||||
unchecked.put (block->previous (), nano::unchecked_info (block));
|
||||
// Enqueues a second block
|
||||
unchecked.put (block->source (), nano::unchecked_info (block));
|
||||
auto check_block_is_listed = [&] (nano::transaction const & transaction_a, nano::block_hash const & block_hash_a) {
|
||||
return unchecked.get (transaction_a, block_hash_a).size () > 0;
|
||||
auto check_block_is_listed = [&] (nano::block_hash const & block_hash_a) {
|
||||
return unchecked.get (block_hash_a).size () > 0;
|
||||
};
|
||||
// Waits for and asserts the first block gets saved in the database
|
||||
ASSERT_TIMELY (5s, check_block_is_listed (store->tx_begin_read (), block->previous ()));
|
||||
ASSERT_TIMELY (5s, check_block_is_listed (block->previous ()));
|
||||
// Waits for and asserts the second block gets saved in the database
|
||||
ASSERT_TIMELY (5s, check_block_is_listed (store->tx_begin_read (), block->source ()));
|
||||
ASSERT_TIMELY (5s, check_block_is_listed (block->source ()));
|
||||
}
|
||||
|
||||
// This test ensures that a block can't occur twice in the unchecked table.
|
||||
|
@ -187,19 +186,19 @@ TEST (unchecked, double_put)
|
|||
.work (5)
|
||||
.build_shared ();
|
||||
// Asserts the block wasn't added yet to the unchecked table
|
||||
auto block_listing1 = unchecked.get (store->tx_begin_read (), block->previous ());
|
||||
auto block_listing1 = unchecked.get (block->previous ());
|
||||
ASSERT_TRUE (block_listing1.empty ());
|
||||
// Enqueues the block to be saved in the unchecked table
|
||||
unchecked.put (block->previous (), nano::unchecked_info (block));
|
||||
// Enqueues the block again in an attempt to have it there twice
|
||||
unchecked.put (block->previous (), nano::unchecked_info (block));
|
||||
auto check_block_is_listed = [&] (nano::transaction const & transaction_a, nano::block_hash const & block_hash_a) {
|
||||
return unchecked.get (transaction_a, block_hash_a).size () > 0;
|
||||
auto check_block_is_listed = [&] (nano::block_hash const & block_hash_a) {
|
||||
return unchecked.get (block_hash_a).size () > 0;
|
||||
};
|
||||
// Waits for and asserts the block was added at least once
|
||||
ASSERT_TIMELY (5s, check_block_is_listed (store->tx_begin_read (), block->previous ()));
|
||||
ASSERT_TIMELY (5s, check_block_is_listed (block->previous ()));
|
||||
// Asserts the block was added at most once -- this is objective of this test.
|
||||
auto block_listing2 = unchecked.get (store->tx_begin_read (), block->previous ());
|
||||
auto block_listing2 = unchecked.get (block->previous ());
|
||||
ASSERT_EQ (block_listing2.size (), 1);
|
||||
}
|
||||
|
||||
|
@ -251,8 +250,7 @@ TEST (unchecked, multiple_get)
|
|||
// we cannot trust the count() method if the backend is rocksdb
|
||||
auto count_unchecked_blocks_one_by_one = [&store, &unchecked] () {
|
||||
size_t count = 0;
|
||||
auto transaction = store->tx_begin_read ();
|
||||
unchecked.for_each (transaction, [&count] (nano::unchecked_key const & key, nano::unchecked_info const & info) {
|
||||
unchecked.for_each ([&count] (nano::unchecked_key const & key, nano::unchecked_info const & info) {
|
||||
++count;
|
||||
});
|
||||
return count;
|
||||
|
@ -263,8 +261,7 @@ TEST (unchecked, multiple_get)
|
|||
|
||||
std::vector<nano::block_hash> unchecked1;
|
||||
// Asserts the entries will be found for the provided key
|
||||
auto transaction = store->tx_begin_read ();
|
||||
auto unchecked1_blocks = unchecked.get (transaction, block1->previous ());
|
||||
auto unchecked1_blocks = unchecked.get (block1->previous ());
|
||||
ASSERT_EQ (unchecked1_blocks.size (), 3);
|
||||
for (auto & i : unchecked1_blocks)
|
||||
{
|
||||
|
@ -276,7 +273,7 @@ TEST (unchecked, multiple_get)
|
|||
ASSERT_TRUE (std::find (unchecked1.begin (), unchecked1.end (), block3->hash ()) != unchecked1.end ());
|
||||
std::vector<nano::block_hash> unchecked2;
|
||||
// Asserts the entries will be found for the provided key
|
||||
auto unchecked2_blocks = unchecked.get (transaction, block1->hash ());
|
||||
auto unchecked2_blocks = unchecked.get (block1->hash ());
|
||||
ASSERT_EQ (unchecked2_blocks.size (), 2);
|
||||
for (auto & i : unchecked2_blocks)
|
||||
{
|
||||
|
@ -286,14 +283,14 @@ TEST (unchecked, multiple_get)
|
|||
ASSERT_TRUE (std::find (unchecked2.begin (), unchecked2.end (), block1->hash ()) != unchecked2.end ());
|
||||
ASSERT_TRUE (std::find (unchecked2.begin (), unchecked2.end (), block2->hash ()) != unchecked2.end ());
|
||||
// Asserts the entry is found by the key and the payload is saved
|
||||
auto unchecked3 = unchecked.get (transaction, block2->previous ());
|
||||
auto unchecked3 = unchecked.get (block2->previous ());
|
||||
ASSERT_EQ (unchecked3.size (), 1);
|
||||
ASSERT_EQ (unchecked3[0].block->hash (), block2->hash ());
|
||||
// Asserts the entry is found by the key and the payload is saved
|
||||
auto unchecked4 = unchecked.get (transaction, block3->hash ());
|
||||
auto unchecked4 = unchecked.get (block3->hash ());
|
||||
ASSERT_EQ (unchecked4.size (), 1);
|
||||
ASSERT_EQ (unchecked4[0].block->hash (), block3->hash ());
|
||||
// Asserts no entry is found for a block that wasn't added
|
||||
auto unchecked5 = unchecked.get (transaction, block2->hash ());
|
||||
auto unchecked5 = unchecked.get (block2->hash ());
|
||||
ASSERT_EQ (unchecked5.size (), 0);
|
||||
}
|
||||
|
|
|
@ -383,6 +383,8 @@ public:
|
|||
uint8_t const protocol_version = 0x13;
|
||||
/** Minimum accepted protocol version */
|
||||
uint8_t const protocol_version_min = 0x12;
|
||||
/** Minimum accepted protocol version used when bootstrapping */
|
||||
uint8_t const bootstrap_protocol_version_min = 0x13;
|
||||
};
|
||||
|
||||
std::string get_node_toml_config_path (boost::filesystem::path const & data_path);
|
||||
|
|
|
@ -45,6 +45,13 @@ enum class type : uint8_t
|
|||
optimistic_scheduler,
|
||||
handshake,
|
||||
|
||||
bootstrap_server_requests,
|
||||
bootstrap_server_responses,
|
||||
bootstrap_ascending,
|
||||
bootstrap_ascending_connections,
|
||||
bootstrap_ascending_thread,
|
||||
bootstrap_ascending_accounts,
|
||||
|
||||
_last // Must be the last enum
|
||||
};
|
||||
|
||||
|
@ -280,6 +287,54 @@ enum class detail : uint8_t
|
|||
missing_cookie,
|
||||
invalid_genesis,
|
||||
|
||||
// bootstrap ascending
|
||||
missing_tag,
|
||||
reply,
|
||||
track,
|
||||
timeout,
|
||||
nothing_new,
|
||||
|
||||
// bootstrap ascending connections
|
||||
connect,
|
||||
connect_missing,
|
||||
connect_failed,
|
||||
connect_success,
|
||||
reuse,
|
||||
|
||||
// bootstrap ascending thread
|
||||
read_block,
|
||||
read_block_done,
|
||||
read_block_end,
|
||||
read_block_error,
|
||||
|
||||
// bootstrap ascending accounts
|
||||
prioritize,
|
||||
prioritize_failed,
|
||||
block,
|
||||
unblock,
|
||||
unblock_failed,
|
||||
|
||||
next_priority,
|
||||
next_database,
|
||||
next_none,
|
||||
|
||||
blocking_insert,
|
||||
blocking_erase_overflow,
|
||||
priority_insert,
|
||||
priority_erase_threshold,
|
||||
priority_erase_block,
|
||||
priority_erase_overflow,
|
||||
deprioritize,
|
||||
deprioritize_failed,
|
||||
|
||||
// active
|
||||
started_hinted,
|
||||
started_optimistic,
|
||||
|
||||
// optimistic
|
||||
pop_gap,
|
||||
pop_leaf,
|
||||
|
||||
_last // Must be the last enum
|
||||
};
|
||||
|
||||
|
|
|
@ -102,6 +102,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
|
|||
case nano::thread_role::name::vote_generator_queue:
|
||||
thread_role_name_string = "Voting que";
|
||||
break;
|
||||
case nano::thread_role::name::ascending_bootstrap:
|
||||
thread_role_name_string = "Bootstrap asc";
|
||||
break;
|
||||
case nano::thread_role::name::bootstrap_server:
|
||||
thread_role_name_string = "Bootstrap serv";
|
||||
break;
|
||||
|
|
|
@ -49,6 +49,9 @@ namespace thread_role
|
|||
bootstrap_server,
|
||||
telemetry,
|
||||
optimistic_scheduler,
|
||||
ascending_bootstrap,
|
||||
bootstrap_server_requests,
|
||||
bootstrap_server_responses,
|
||||
};
|
||||
|
||||
/*
|
||||
|
|
|
@ -426,7 +426,7 @@ int main (int argc, char * const * argv)
|
|||
}
|
||||
|
||||
// Check all unchecked keys for matching frontier hashes. Indicates an issue with process_batch algorithm
|
||||
node->unchecked.for_each (transaction, [&frontier_hashes] (nano::unchecked_key const & key, nano::unchecked_info const & info) {
|
||||
node->unchecked.for_each ([&frontier_hashes] (nano::unchecked_key const & key, nano::unchecked_info const & info) {
|
||||
auto it = frontier_hashes.find (key.key ());
|
||||
if (it != frontier_hashes.cend ())
|
||||
{
|
||||
|
@ -999,7 +999,7 @@ int main (int argc, char * const * argv)
|
|||
if (timer_l.after_deadline (std::chrono::seconds (15)))
|
||||
{
|
||||
timer_l.restart ();
|
||||
std::cout << boost::str (boost::format ("%1% (%2%) blocks processed (unchecked), %3% remaining") % node->ledger.cache.block_count % node->unchecked.count (node->store.tx_begin_read ()) % node->block_processor.size ()) << std::endl;
|
||||
std::cout << boost::str (boost::format ("%1% (%2%) blocks processed (unchecked), %3% remaining") % node->ledger.cache.block_count % node->unchecked.count () % node->block_processor.size ()) << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1847,7 +1847,7 @@ int main (int argc, char * const * argv)
|
|||
if (timer_l.after_deadline (std::chrono::seconds (60)))
|
||||
{
|
||||
timer_l.restart ();
|
||||
std::cout << boost::str (boost::format ("%1% (%2%) blocks processed (unchecked)") % node.node->ledger.cache.block_count % node.node->unchecked.count (node.node->store.tx_begin_read ())) << std::endl;
|
||||
std::cout << boost::str (boost::format ("%1% (%2%) blocks processed (unchecked)") % node.node->ledger.cache.block_count % node.node->unchecked.count ()) << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,8 @@ add_library(
|
|||
blockprocessor.cpp
|
||||
bootstrap/block_deserializer.hpp
|
||||
bootstrap/block_deserializer.cpp
|
||||
bootstrap/bootstrap_ascending.hpp
|
||||
bootstrap/bootstrap_ascending.cpp
|
||||
bootstrap/bootstrap_attempt.hpp
|
||||
bootstrap/bootstrap_attempt.cpp
|
||||
bootstrap/bootstrap_bulk_pull.hpp
|
||||
|
|
|
@ -27,7 +27,8 @@ enum class bootstrap_mode
|
|||
{
|
||||
legacy,
|
||||
lazy,
|
||||
wallet_lazy
|
||||
wallet_lazy,
|
||||
ascending
|
||||
};
|
||||
enum class sync_result
|
||||
{
|
||||
|
|
855
nano/node/bootstrap/bootstrap_ascending.cpp
Normal file
855
nano/node/bootstrap/bootstrap_ascending.cpp
Normal file
|
@ -0,0 +1,855 @@
|
|||
#include <nano/lib/stats_enums.hpp>
|
||||
#include <nano/node/bootstrap/block_deserializer.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_ascending.hpp>
|
||||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/transport/transport.hpp>
|
||||
#include <nano/secure/common.hpp>
|
||||
|
||||
#include <boost/format.hpp>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
/*
|
||||
* database_iterator
|
||||
*/
|
||||
|
||||
nano::bootstrap_ascending::database_iterator::database_iterator (nano::store & store_a, table_type table_a) :
|
||||
store{ store_a },
|
||||
table{ table_a }
|
||||
{
|
||||
}
|
||||
|
||||
nano::account nano::bootstrap_ascending::database_iterator::operator* () const
|
||||
{
|
||||
return current;
|
||||
}
|
||||
|
||||
void nano::bootstrap_ascending::database_iterator::next (nano::transaction & tx)
|
||||
{
|
||||
switch (table)
|
||||
{
|
||||
case table_type::account:
|
||||
{
|
||||
auto i = current.number () + 1;
|
||||
auto item = store.account.begin (tx, i);
|
||||
if (item != store.account.end ())
|
||||
{
|
||||
current = item->first;
|
||||
}
|
||||
else
|
||||
{
|
||||
current = { 0 };
|
||||
}
|
||||
break;
|
||||
}
|
||||
case table_type::pending:
|
||||
{
|
||||
auto i = current.number () + 1;
|
||||
auto item = store.pending.begin (tx, nano::pending_key{ i, 0 });
|
||||
if (item != store.pending.end ())
|
||||
{
|
||||
current = item->first.account;
|
||||
}
|
||||
else
|
||||
{
|
||||
current = { 0 };
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* buffered_iterator
|
||||
*/
|
||||
|
||||
nano::bootstrap_ascending::buffered_iterator::buffered_iterator (nano::store & store_a) :
|
||||
store{ store_a },
|
||||
accounts_iterator{ store, database_iterator::table_type::account },
|
||||
pending_iterator{ store, database_iterator::table_type::pending }
|
||||
{
|
||||
}
|
||||
|
||||
nano::account nano::bootstrap_ascending::buffered_iterator::operator* () const
|
||||
{
|
||||
return !buffer.empty () ? buffer.front () : nano::account{ 0 };
|
||||
}
|
||||
|
||||
nano::account nano::bootstrap_ascending::buffered_iterator::next ()
|
||||
{
|
||||
if (!buffer.empty ())
|
||||
{
|
||||
buffer.pop_front ();
|
||||
}
|
||||
else
|
||||
{
|
||||
fill ();
|
||||
}
|
||||
|
||||
return *(*this);
|
||||
}
|
||||
|
||||
void nano::bootstrap_ascending::buffered_iterator::fill ()
|
||||
{
|
||||
debug_assert (buffer.empty ());
|
||||
|
||||
// Fill half from accounts table and half from pending table
|
||||
auto transaction = store.tx_begin_read ();
|
||||
|
||||
for (int n = 0; n < size / 2; ++n)
|
||||
{
|
||||
accounts_iterator.next (transaction);
|
||||
if (!(*accounts_iterator).is_zero ())
|
||||
{
|
||||
buffer.push_back (*accounts_iterator);
|
||||
}
|
||||
}
|
||||
|
||||
for (int n = 0; n < size / 2; ++n)
|
||||
{
|
||||
pending_iterator.next (transaction);
|
||||
if (!(*pending_iterator).is_zero ())
|
||||
{
|
||||
buffer.push_back (*pending_iterator);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* account_sets
|
||||
*/
|
||||
|
||||
nano::bootstrap_ascending::account_sets::account_sets (nano::stats & stats_a) :
|
||||
stats{ stats_a }
|
||||
{
|
||||
}
|
||||
|
||||
void nano::bootstrap_ascending::account_sets::priority_up (nano::account const & account)
|
||||
{
|
||||
if (!blocked (account))
|
||||
{
|
||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::prioritize);
|
||||
|
||||
auto iter = priorities.get<tag_account> ().find (account);
|
||||
if (iter != priorities.get<tag_account> ().end ())
|
||||
{
|
||||
priorities.get<tag_account> ().modify (iter, [] (auto & val) {
|
||||
val.priority = std::min ((val.priority * account_sets::priority_increase), account_sets::priority_max);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
priorities.get<tag_account> ().insert ({ account, account_sets::priority_initial });
|
||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_insert);
|
||||
|
||||
trim_overflow ();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::prioritize_failed);
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_ascending::account_sets::priority_down (nano::account const & account)
|
||||
{
|
||||
auto iter = priorities.get<tag_account> ().find (account);
|
||||
if (iter != priorities.get<tag_account> ().end ())
|
||||
{
|
||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::deprioritize);
|
||||
|
||||
auto priority_new = iter->priority - account_sets::priority_decrease;
|
||||
if (priority_new <= account_sets::priority_cutoff)
|
||||
{
|
||||
priorities.get<tag_account> ().erase (iter);
|
||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_threshold);
|
||||
}
|
||||
else
|
||||
{
|
||||
priorities.get<tag_account> ().modify (iter, [priority_new] (auto & val) {
|
||||
val.priority = priority_new;
|
||||
});
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::deprioritize_failed);
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_ascending::account_sets::block (nano::account const & account, nano::block_hash const & dependency)
|
||||
{
|
||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::block);
|
||||
|
||||
auto existing = priorities.get<tag_account> ().find (account);
|
||||
auto entry = existing == priorities.get<tag_account> ().end () ? priority_entry{ 0, 0 } : *existing;
|
||||
|
||||
priorities.get<tag_account> ().erase (account);
|
||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_block);
|
||||
|
||||
blocking.get<tag_account> ().insert ({ account, dependency, entry });
|
||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::blocking_insert);
|
||||
|
||||
trim_overflow ();
|
||||
}
|
||||
|
||||
void nano::bootstrap_ascending::account_sets::unblock (nano::account const & account, std::optional<nano::block_hash> const & hash)
|
||||
{
|
||||
// Unblock only if the dependency is fulfilled
|
||||
auto existing = blocking.get<tag_account> ().find (account);
|
||||
if (existing != blocking.get<tag_account> ().end () && (!hash || existing->dependency == *hash))
|
||||
{
|
||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::unblock);
|
||||
|
||||
debug_assert (priorities.get<tag_account> ().count (account) == 0);
|
||||
if (!existing->original_entry.account.is_zero ())
|
||||
{
|
||||
debug_assert (existing->original_entry.account == account);
|
||||
priorities.get<tag_account> ().insert (existing->original_entry);
|
||||
}
|
||||
else
|
||||
{
|
||||
priorities.get<tag_account> ().insert ({ account, account_sets::priority_initial });
|
||||
}
|
||||
blocking.get<tag_account> ().erase (account);
|
||||
|
||||
trim_overflow ();
|
||||
}
|
||||
else
|
||||
{
|
||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::unblock_failed);
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_ascending::account_sets::timestamp (const nano::account & account, bool reset)
|
||||
{
|
||||
const nano::millis_t tstamp = reset ? 0 : nano::milliseconds_since_epoch ();
|
||||
|
||||
auto iter = priorities.get<tag_account> ().find (account);
|
||||
if (iter != priorities.get<tag_account> ().end ())
|
||||
{
|
||||
priorities.get<tag_account> ().modify (iter, [tstamp] (auto & entry) {
|
||||
entry.timestamp = tstamp;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
bool nano::bootstrap_ascending::account_sets::check_timestamp (const nano::account & account) const
|
||||
{
|
||||
auto iter = priorities.get<tag_account> ().find (account);
|
||||
if (iter != priorities.get<tag_account> ().end ())
|
||||
{
|
||||
if (nano::milliseconds_since_epoch () - iter->timestamp < cooldown)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void nano::bootstrap_ascending::account_sets::trim_overflow ()
|
||||
{
|
||||
if (priorities.size () > priorities_max)
|
||||
{
|
||||
// Evict the lowest priority entry
|
||||
priorities.get<tag_priority> ().erase (priorities.get<tag_priority> ().begin ());
|
||||
|
||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_overflow);
|
||||
}
|
||||
if (blocking.size () > blocking_max)
|
||||
{
|
||||
// Evict the lowest priority entry
|
||||
blocking.get<tag_priority> ().erase (blocking.get<tag_priority> ().begin ());
|
||||
|
||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::blocking_erase_overflow);
|
||||
}
|
||||
}
|
||||
|
||||
nano::account nano::bootstrap_ascending::account_sets::next ()
|
||||
{
|
||||
if (priorities.empty ())
|
||||
{
|
||||
return { 0 };
|
||||
}
|
||||
|
||||
std::vector<float> weights;
|
||||
std::vector<nano::account> candidates;
|
||||
|
||||
int iterations = 0;
|
||||
while (candidates.size () < account_sets::consideration_count && iterations++ < account_sets::consideration_count * 10)
|
||||
{
|
||||
debug_assert (candidates.size () == weights.size ());
|
||||
|
||||
// Use a dedicated, uniformly distributed field for sampling to avoid problematic corner case when accounts in the queue are very close together
|
||||
auto search = bootstrap_ascending::generate_id ();
|
||||
auto iter = priorities.get<tag_id> ().lower_bound (search);
|
||||
if (iter == priorities.get<tag_id> ().end ())
|
||||
{
|
||||
iter = priorities.get<tag_id> ().begin ();
|
||||
}
|
||||
|
||||
if (check_timestamp (iter->account))
|
||||
{
|
||||
candidates.push_back (iter->account);
|
||||
weights.push_back (iter->priority);
|
||||
}
|
||||
}
|
||||
|
||||
if (candidates.empty ())
|
||||
{
|
||||
return { 0 }; // All sampled accounts are busy
|
||||
}
|
||||
|
||||
std::discrete_distribution dist{ weights.begin (), weights.end () };
|
||||
auto selection = dist (rng);
|
||||
debug_assert (!weights.empty () && selection < weights.size ());
|
||||
auto result = candidates[selection];
|
||||
return result;
|
||||
}
|
||||
|
||||
bool nano::bootstrap_ascending::account_sets::blocked (nano::account const & account) const
|
||||
{
|
||||
return blocking.get<tag_account> ().count (account) > 0;
|
||||
}
|
||||
|
||||
std::size_t nano::bootstrap_ascending::account_sets::priority_size () const
|
||||
{
|
||||
return priorities.size ();
|
||||
}
|
||||
|
||||
std::size_t nano::bootstrap_ascending::account_sets::blocked_size () const
|
||||
{
|
||||
return blocking.size ();
|
||||
}
|
||||
|
||||
float nano::bootstrap_ascending::account_sets::priority (nano::account const & account) const
|
||||
{
|
||||
if (blocked (account))
|
||||
{
|
||||
return 0.0f;
|
||||
}
|
||||
auto existing = priorities.get<tag_account> ().find (account);
|
||||
if (existing != priorities.get<tag_account> ().end ())
|
||||
{
|
||||
return existing->priority;
|
||||
}
|
||||
return account_sets::priority_cutoff;
|
||||
}
|
||||
|
||||
auto nano::bootstrap_ascending::account_sets::info () const -> info_t
|
||||
{
|
||||
return { blocking, priorities };
|
||||
}
|
||||
|
||||
std::unique_ptr<nano::container_info_component> nano::bootstrap_ascending::account_sets::collect_container_info (const std::string & name)
|
||||
{
|
||||
auto composite = std::make_unique<container_info_composite> (name);
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "priorities", priorities.size (), sizeof (decltype (priorities)::value_type) }));
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocking", blocking.size (), sizeof (decltype (blocking)::value_type) }));
|
||||
return composite;
|
||||
}
|
||||
|
||||
/*
|
||||
* priority_entry
|
||||
*/
|
||||
|
||||
nano::bootstrap_ascending::account_sets::priority_entry::priority_entry (nano::account account_a, float priority_a) :
|
||||
account{ account_a },
|
||||
priority{ priority_a }
|
||||
{
|
||||
id = nano::bootstrap_ascending::generate_id ();
|
||||
}
|
||||
|
||||
/*
|
||||
* bootstrap_ascending
|
||||
*/
|
||||
|
||||
nano::bootstrap_ascending::bootstrap_ascending (nano::node & node_a, nano::store & store_a, nano::block_processor & block_processor_a, nano::ledger & ledger_a, nano::network & network_a, nano::stats & stat_a) :
|
||||
node{ node_a },
|
||||
store{ store_a },
|
||||
block_processor{ block_processor_a },
|
||||
ledger{ ledger_a },
|
||||
network{ network_a },
|
||||
stats{ stat_a },
|
||||
accounts{ stats },
|
||||
iterator{ store },
|
||||
limiter{ requests_limit, 1.0 },
|
||||
database_limiter{ database_requests_limit, 1.0 }
|
||||
{
|
||||
// TODO: This is called from a very congested blockprocessor thread. Offload this work to a dedicated processing thread
|
||||
block_processor.batch_processed.add ([this] (auto const & batch) {
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
|
||||
auto transaction = store.tx_begin_read ();
|
||||
for (auto const & [result, block] : batch)
|
||||
{
|
||||
debug_assert (block != nullptr);
|
||||
|
||||
inspect (transaction, result, *block);
|
||||
}
|
||||
}
|
||||
|
||||
condition.notify_all ();
|
||||
});
|
||||
}
|
||||
|
||||
nano::bootstrap_ascending::~bootstrap_ascending ()
|
||||
{
|
||||
// All threads must be stopped before destruction
|
||||
debug_assert (!thread.joinable ());
|
||||
debug_assert (!timeout_thread.joinable ());
|
||||
}
|
||||
|
||||
void nano::bootstrap_ascending::start ()
|
||||
{
|
||||
debug_assert (!thread.joinable ());
|
||||
debug_assert (!timeout_thread.joinable ());
|
||||
|
||||
thread = std::thread ([this] () {
|
||||
nano::thread_role::set (nano::thread_role::name::ascending_bootstrap);
|
||||
run ();
|
||||
});
|
||||
|
||||
timeout_thread = std::thread ([this] () {
|
||||
nano::thread_role::set (nano::thread_role::name::ascending_bootstrap);
|
||||
run_timeouts ();
|
||||
});
|
||||
}
|
||||
|
||||
void nano::bootstrap_ascending::stop ()
|
||||
{
|
||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||
stopped = true;
|
||||
lock.unlock ();
|
||||
condition.notify_all ();
|
||||
nano::join_or_pass (thread);
|
||||
nano::join_or_pass (timeout_thread);
|
||||
}
|
||||
|
||||
nano::bootstrap_ascending::id_t nano::bootstrap_ascending::generate_id ()
|
||||
{
|
||||
id_t id;
|
||||
nano::random_pool::generate_block (reinterpret_cast<uint8_t *> (&id), sizeof (id));
|
||||
return id;
|
||||
}
|
||||
|
||||
void nano::bootstrap_ascending::send (std::shared_ptr<nano::transport::channel> channel, async_tag tag)
|
||||
{
|
||||
debug_assert (tag.type == async_tag::query_type::blocks_by_hash || tag.type == async_tag::query_type::blocks_by_account);
|
||||
|
||||
nano::asc_pull_req request{ node.network_params.network };
|
||||
request.id = tag.id;
|
||||
request.type = nano::asc_pull_type::blocks;
|
||||
|
||||
nano::asc_pull_req::blocks_payload request_payload;
|
||||
request_payload.start = tag.start;
|
||||
request_payload.count = pull_count;
|
||||
request_payload.start_type = tag.type == async_tag::query_type::blocks_by_hash ? nano::asc_pull_req::hash_type::block : nano::asc_pull_req::hash_type::account;
|
||||
|
||||
request.payload = request_payload;
|
||||
request.update_header ();
|
||||
|
||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::request, nano::stat::dir::out);
|
||||
|
||||
// TODO: There is no feedback mechanism if bandwidth limiter starts dropping our requests
|
||||
channel->send (
|
||||
request, nullptr,
|
||||
nano::transport::buffer_drop_policy::limiter, nano::bandwidth_limit_type::bootstrap);
|
||||
}
|
||||
|
||||
size_t nano::bootstrap_ascending::priority_size () const
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
return accounts.priority_size ();
|
||||
}
|
||||
|
||||
size_t nano::bootstrap_ascending::blocked_size () const
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
return accounts.blocked_size ();
|
||||
}
|
||||
|
||||
/** Inspects a block that has been processed by the block processor
|
||||
- Marks an account as blocked if the result code is gap source as there is no reason request additional blocks for this account until the dependency is resolved
|
||||
- Marks an account as forwarded if it has been recently referenced by a block that has been inserted.
|
||||
*/
|
||||
void nano::bootstrap_ascending::inspect (nano::transaction const & tx, nano::process_return const & result, nano::block const & block)
|
||||
{
|
||||
auto const hash = block.hash ();
|
||||
|
||||
switch (result.code)
|
||||
{
|
||||
case nano::process_result::progress:
|
||||
{
|
||||
const auto account = ledger.account (tx, hash);
|
||||
const auto is_send = ledger.is_send (tx, block);
|
||||
|
||||
// If we've inserted any block in to an account, unmark it as blocked
|
||||
accounts.unblock (account);
|
||||
accounts.priority_up (account);
|
||||
accounts.timestamp (account, /* reset timestamp */ true);
|
||||
|
||||
if (is_send)
|
||||
{
|
||||
// TODO: Encapsulate this as a helper somewhere
|
||||
nano::account destination{ 0 };
|
||||
switch (block.type ())
|
||||
{
|
||||
case nano::block_type::send:
|
||||
destination = block.destination ();
|
||||
break;
|
||||
case nano::block_type::state:
|
||||
destination = block.link ().as_account ();
|
||||
break;
|
||||
default:
|
||||
debug_assert (false, "unexpected block type");
|
||||
break;
|
||||
}
|
||||
if (!destination.is_zero ())
|
||||
{
|
||||
accounts.unblock (destination, hash); // Unblocking automatically inserts account into priority set
|
||||
accounts.priority_up (destination);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
case nano::process_result::gap_source:
|
||||
{
|
||||
const auto account = block.previous ().is_zero () ? block.account () : ledger.account (tx, block.previous ());
|
||||
const auto source = block.source ().is_zero () ? block.link ().as_block_hash () : block.source ();
|
||||
|
||||
// Mark account as blocked because it is missing the source block
|
||||
accounts.block (account, source);
|
||||
|
||||
// TODO: Track stats
|
||||
}
|
||||
break;
|
||||
case nano::process_result::old:
|
||||
{
|
||||
// TODO: Track stats
|
||||
}
|
||||
break;
|
||||
case nano::process_result::gap_previous:
|
||||
{
|
||||
// TODO: Track stats
|
||||
}
|
||||
break;
|
||||
default: // No need to handle other cases
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_ascending::wait_blockprocessor ()
|
||||
{
|
||||
while (!stopped && block_processor.half_full ())
|
||||
{
|
||||
std::this_thread::sleep_for (500ms); // Blockprocessor is relatively slow, sleeping here instead of using conditions
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_ascending::wait_available_request ()
|
||||
{
|
||||
while (!stopped && !limiter.should_pass (1))
|
||||
{
|
||||
std::this_thread::sleep_for (50ms); // Give it at least some time to cooldown to avoid hitting the limit too frequently
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<nano::transport::channel> nano::bootstrap_ascending::available_channel ()
|
||||
{
|
||||
auto channels = network.random_set (32, node.network_params.network.bootstrap_protocol_version_min, /* include temporary channels */ true);
|
||||
for (auto & channel : channels)
|
||||
{
|
||||
if (!channel->max ())
|
||||
{
|
||||
return channel;
|
||||
}
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
std::shared_ptr<nano::transport::channel> nano::bootstrap_ascending::wait_available_channel ()
|
||||
{
|
||||
std::shared_ptr<nano::transport::channel> channel;
|
||||
while (!stopped && !(channel = available_channel ()))
|
||||
{
|
||||
std::this_thread::sleep_for (100ms);
|
||||
}
|
||||
return channel;
|
||||
}
|
||||
|
||||
nano::account nano::bootstrap_ascending::available_account ()
|
||||
{
|
||||
{
|
||||
auto account = accounts.next ();
|
||||
if (!account.is_zero ())
|
||||
{
|
||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_priority);
|
||||
return account;
|
||||
}
|
||||
}
|
||||
|
||||
if (database_limiter.should_pass (1))
|
||||
{
|
||||
auto account = iterator.next ();
|
||||
if (!account.is_zero ())
|
||||
{
|
||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_database);
|
||||
return account;
|
||||
}
|
||||
}
|
||||
|
||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_none);
|
||||
return { 0 };
|
||||
}
|
||||
|
||||
nano::account nano::bootstrap_ascending::wait_available_account ()
|
||||
{
|
||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||
while (!stopped)
|
||||
{
|
||||
auto account = available_account ();
|
||||
if (!account.is_zero ())
|
||||
{
|
||||
accounts.timestamp (account);
|
||||
return account;
|
||||
}
|
||||
else
|
||||
{
|
||||
condition.wait_for (lock, 100ms);
|
||||
}
|
||||
}
|
||||
return { 0 };
|
||||
}
|
||||
|
||||
bool nano::bootstrap_ascending::request (nano::account & account, std::shared_ptr<nano::transport::channel> & channel)
|
||||
{
|
||||
async_tag tag{};
|
||||
tag.id = generate_id ();
|
||||
tag.account = account;
|
||||
tag.time = nano::milliseconds_since_epoch ();
|
||||
|
||||
// Check if the account picked has blocks, if it does, start the pull from the highest block
|
||||
auto info = store.account.get (store.tx_begin_read (), account);
|
||||
if (info)
|
||||
{
|
||||
tag.type = async_tag::query_type::blocks_by_hash;
|
||||
tag.start = info->head;
|
||||
}
|
||||
else
|
||||
{
|
||||
tag.type = async_tag::query_type::blocks_by_account;
|
||||
tag.start = account;
|
||||
}
|
||||
|
||||
on_request.notify (tag, channel);
|
||||
|
||||
track (tag);
|
||||
send (channel, tag);
|
||||
|
||||
return true; // Request sent
|
||||
}
|
||||
|
||||
bool nano::bootstrap_ascending::run_one ()
|
||||
{
|
||||
// Ensure there is enough space in blockprocessor for queuing new blocks
|
||||
wait_blockprocessor ();
|
||||
|
||||
// Do not do too many requests in parallel, impose throttling
|
||||
wait_available_request ();
|
||||
|
||||
// Waits for channel that is not full
|
||||
auto channel = wait_available_channel ();
|
||||
if (!channel)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
// Waits for account either from priority queue or database
|
||||
auto account = wait_available_account ();
|
||||
if (account.is_zero ())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool success = request (account, channel);
|
||||
return success;
|
||||
}
|
||||
|
||||
void nano::bootstrap_ascending::run ()
|
||||
{
|
||||
while (!stopped)
|
||||
{
|
||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop);
|
||||
run_one ();
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_ascending::run_timeouts ()
|
||||
{
|
||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||
while (!stopped)
|
||||
{
|
||||
auto & tags_by_order = tags.get<tag_sequenced> ();
|
||||
while (!tags_by_order.empty () && nano::time_difference (tags_by_order.front ().time, nano::milliseconds_since_epoch ()) > timeout)
|
||||
{
|
||||
auto tag = tags_by_order.front ();
|
||||
tags_by_order.pop_front ();
|
||||
on_timeout.notify (tag);
|
||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::timeout);
|
||||
}
|
||||
condition.wait_for (lock, 1s, [this] () { return stopped; });
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_ascending::process (const nano::asc_pull_ack & message)
|
||||
{
|
||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||
|
||||
// Only process messages that have a known tag
|
||||
auto & tags_by_id = tags.get<tag_id> ();
|
||||
if (tags_by_id.count (message.id) > 0)
|
||||
{
|
||||
auto iterator = tags_by_id.find (message.id);
|
||||
auto tag = *iterator;
|
||||
tags_by_id.erase (iterator);
|
||||
|
||||
lock.unlock ();
|
||||
|
||||
on_reply.notify (tag);
|
||||
condition.notify_all ();
|
||||
std::visit ([this, &tag] (auto && request) { return process (request, tag); }, message.payload);
|
||||
}
|
||||
else
|
||||
{
|
||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::missing_tag);
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_ascending::process (const nano::asc_pull_ack::blocks_payload & response, const nano::bootstrap_ascending::async_tag & tag)
|
||||
{
|
||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::reply);
|
||||
|
||||
auto result = verify (response, tag);
|
||||
switch (result)
|
||||
{
|
||||
case verify_result::ok:
|
||||
{
|
||||
stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::blocks, nano::stat::dir::in, response.blocks.size ());
|
||||
|
||||
for (auto & block : response.blocks)
|
||||
{
|
||||
block_processor.add (block);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case verify_result::nothing_new:
|
||||
{
|
||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::nothing_new);
|
||||
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
accounts.priority_down (tag.account);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case verify_result::invalid:
|
||||
{
|
||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::invalid);
|
||||
// TODO: Log
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_ascending::process (const nano::asc_pull_ack::account_info_payload & response, const nano::bootstrap_ascending::async_tag & tag)
|
||||
{
|
||||
// TODO: Make use of account info
|
||||
}
|
||||
|
||||
void nano::bootstrap_ascending::process (const nano::empty_payload & response, const nano::bootstrap_ascending::async_tag & tag)
|
||||
{
|
||||
// Should not happen
|
||||
debug_assert (false, "empty payload");
|
||||
}
|
||||
|
||||
nano::bootstrap_ascending::verify_result nano::bootstrap_ascending::verify (const nano::asc_pull_ack::blocks_payload & response, const nano::bootstrap_ascending::async_tag & tag) const
|
||||
{
|
||||
auto const & blocks = response.blocks;
|
||||
|
||||
if (blocks.empty ())
|
||||
{
|
||||
return verify_result::nothing_new;
|
||||
}
|
||||
if (blocks.size () == 1 && blocks.front ()->hash () == tag.start.as_block_hash ())
|
||||
{
|
||||
return verify_result::nothing_new;
|
||||
}
|
||||
|
||||
auto const & first = blocks.front ();
|
||||
switch (tag.type)
|
||||
{
|
||||
case async_tag::query_type::blocks_by_hash:
|
||||
{
|
||||
if (first->hash () != tag.start.as_block_hash ())
|
||||
{
|
||||
// TODO: Stat & log
|
||||
return verify_result::invalid;
|
||||
}
|
||||
}
|
||||
break;
|
||||
case async_tag::query_type::blocks_by_account:
|
||||
{
|
||||
// Open & state blocks always contain account field
|
||||
if (first->account () != tag.start.as_account ())
|
||||
{
|
||||
// TODO: Stat & log
|
||||
return verify_result::invalid;
|
||||
}
|
||||
}
|
||||
break;
|
||||
default:
|
||||
return verify_result::invalid;
|
||||
}
|
||||
|
||||
// Verify blocks make a valid chain
|
||||
nano::block_hash previous_hash = blocks.front ()->hash ();
|
||||
for (int n = 1; n < blocks.size (); ++n)
|
||||
{
|
||||
auto & block = blocks[n];
|
||||
if (block->previous () != previous_hash)
|
||||
{
|
||||
// TODO: Stat & log
|
||||
return verify_result::invalid; // Blocks do not make a chain
|
||||
}
|
||||
previous_hash = block->hash ();
|
||||
}
|
||||
|
||||
return verify_result::ok;
|
||||
}
|
||||
|
||||
void nano::bootstrap_ascending::track (async_tag const & tag)
|
||||
{
|
||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::track);
|
||||
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
debug_assert (tags.get<tag_id> ().count (tag.id) == 0);
|
||||
tags.get<tag_id> ().insert (tag);
|
||||
}
|
||||
|
||||
auto nano::bootstrap_ascending::info () const -> account_sets::info_t
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
return accounts.info ();
|
||||
}
|
||||
|
||||
std::unique_ptr<nano::container_info_component> nano::bootstrap_ascending::collect_container_info (std::string const & name)
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
|
||||
auto composite = std::make_unique<container_info_composite> (name);
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "tags", tags.size (), sizeof (decltype (tags)::value_type) }));
|
||||
composite->add_component (accounts.collect_container_info ("accounts"));
|
||||
return composite;
|
||||
}
|
329
nano/node/bootstrap/bootstrap_ascending.hpp
Normal file
329
nano/node/bootstrap/bootstrap_ascending.hpp
Normal file
|
@ -0,0 +1,329 @@
|
|||
#pragma once
|
||||
|
||||
#include <nano/lib/observer_set.hpp>
|
||||
#include <nano/lib/timer.hpp>
|
||||
#include <nano/node/bandwidth_limiter.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_attempt.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_server.hpp>
|
||||
|
||||
#include <boost/multi_index/hashed_index.hpp>
|
||||
#include <boost/multi_index/mem_fun.hpp>
|
||||
#include <boost/multi_index/member.hpp>
|
||||
#include <boost/multi_index/ordered_index.hpp>
|
||||
#include <boost/multi_index/random_access_index.hpp>
|
||||
#include <boost/multi_index/sequenced_index.hpp>
|
||||
#include <boost/multi_index_container.hpp>
|
||||
|
||||
#include <random>
|
||||
#include <thread>
|
||||
|
||||
namespace mi = boost::multi_index;
|
||||
|
||||
namespace nano
|
||||
{
|
||||
class block_processor;
|
||||
class ledger;
|
||||
class network;
|
||||
|
||||
namespace transport
|
||||
{
|
||||
class channel;
|
||||
}
|
||||
|
||||
class bootstrap_ascending
|
||||
{
|
||||
using id_t = uint64_t;
|
||||
|
||||
public:
|
||||
bootstrap_ascending (nano::node &, nano::store &, nano::block_processor &, nano::ledger &, nano::network &, nano::stats &);
|
||||
~bootstrap_ascending ();
|
||||
|
||||
void start ();
|
||||
void stop ();
|
||||
|
||||
/**
|
||||
* Process `asc_pull_ack` message coming from network
|
||||
*/
|
||||
void process (nano::asc_pull_ack const & message);
|
||||
|
||||
public: // Container info
|
||||
std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name);
|
||||
size_t blocked_size () const;
|
||||
size_t priority_size () const;
|
||||
|
||||
private: // Dependencies
|
||||
nano::node & node;
|
||||
nano::store & store;
|
||||
nano::block_processor & block_processor;
|
||||
nano::ledger & ledger;
|
||||
nano::network & network;
|
||||
nano::stats & stats;
|
||||
|
||||
public: // async_tag
|
||||
struct async_tag
|
||||
{
|
||||
enum class query_type
|
||||
{
|
||||
invalid = 0, // Default initialization
|
||||
blocks_by_hash,
|
||||
blocks_by_account,
|
||||
// TODO: account_info,
|
||||
};
|
||||
|
||||
query_type type{ query_type::invalid };
|
||||
id_t id{ 0 };
|
||||
nano::hash_or_account start{ 0 };
|
||||
nano::millis_t time{ 0 };
|
||||
nano::account account{ 0 };
|
||||
};
|
||||
|
||||
public: // Events
|
||||
nano::observer_set<async_tag const &, std::shared_ptr<nano::transport::channel> &> on_request;
|
||||
nano::observer_set<async_tag const &> on_reply;
|
||||
nano::observer_set<async_tag const &> on_timeout;
|
||||
|
||||
private:
|
||||
/* Inspects a block that has been processed by the block processor */
|
||||
void inspect (nano::transaction const &, nano::process_return const & result, nano::block const & block);
|
||||
|
||||
void run ();
|
||||
bool run_one ();
|
||||
void run_timeouts ();
|
||||
|
||||
/* Limits the number of requests per second we make */
|
||||
void wait_available_request ();
|
||||
/* Throttles requesting new blocks, not to overwhelm blockprocessor */
|
||||
void wait_blockprocessor ();
|
||||
/* Waits for channel with free capacity for bootstrap messages */
|
||||
std::shared_ptr<nano::transport::channel> wait_available_channel ();
|
||||
std::shared_ptr<nano::transport::channel> available_channel ();
|
||||
/* Waits until a suitable account outside of cool down period is available */
|
||||
nano::account available_account ();
|
||||
nano::account wait_available_account ();
|
||||
|
||||
bool request (nano::account &, std::shared_ptr<nano::transport::channel> &);
|
||||
void send (std::shared_ptr<nano::transport::channel>, async_tag tag);
|
||||
void track (async_tag const & tag);
|
||||
|
||||
void process (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag);
|
||||
void process (nano::asc_pull_ack::account_info_payload const & response, async_tag const & tag);
|
||||
void process (nano::empty_payload const & response, async_tag const & tag);
|
||||
|
||||
enum class verify_result
|
||||
{
|
||||
ok,
|
||||
nothing_new,
|
||||
invalid,
|
||||
};
|
||||
|
||||
/**
|
||||
* Verifies whether the received response is valid. Returns:
|
||||
* - invalid: when received blocks do not correspond to requested hash/account or they do not make a valid chain
|
||||
* - nothing_new: when received response indicates that the account chain does not have more blocks
|
||||
* - ok: otherwise, if all checks pass
|
||||
*/
|
||||
verify_result verify (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag) const;
|
||||
|
||||
static id_t generate_id ();
|
||||
|
||||
public: // account_sets
|
||||
/** This class tracks accounts various account sets which are shared among the multiple bootstrap threads */
|
||||
class account_sets
|
||||
{
|
||||
public:
|
||||
explicit account_sets (nano::stats &);
|
||||
|
||||
/**
|
||||
* If an account is not blocked, increase its priority.
|
||||
* If the account does not exist in priority set and is not blocked, inserts a new entry.
|
||||
* Current implementation increases priority by 1.0f each increment
|
||||
*/
|
||||
void priority_up (nano::account const & account);
|
||||
/**
|
||||
* Decreases account priority
|
||||
* Current implementation divides priority by 2.0f and saturates down to 1.0f.
|
||||
*/
|
||||
void priority_down (nano::account const & account);
|
||||
void block (nano::account const & account, nano::block_hash const & dependency);
|
||||
void unblock (nano::account const & account, std::optional<nano::block_hash> const & hash = std::nullopt);
|
||||
void timestamp (nano::account const & account, bool reset = false);
|
||||
|
||||
nano::account next ();
|
||||
|
||||
public:
|
||||
bool blocked (nano::account const & account) const;
|
||||
std::size_t priority_size () const;
|
||||
std::size_t blocked_size () const;
|
||||
/**
|
||||
* Accounts in the ledger but not in priority list are assumed priority 1.0f
|
||||
* Blocked accounts are assumed priority 0.0f
|
||||
*/
|
||||
float priority (nano::account const & account) const;
|
||||
|
||||
public: // Container info
|
||||
std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name);
|
||||
|
||||
private:
|
||||
void trim_overflow ();
|
||||
bool check_timestamp (nano::account const & account) const;
|
||||
|
||||
private: // Dependencies
|
||||
nano::stats & stats;
|
||||
|
||||
private:
|
||||
struct priority_entry
|
||||
{
|
||||
nano::account account{ 0 };
|
||||
float priority{ 0 };
|
||||
nano::millis_t timestamp{ 0 };
|
||||
id_t id{ 0 }; // Uniformly distributed, used for random querying
|
||||
|
||||
priority_entry (nano::account account, float priority);
|
||||
};
|
||||
|
||||
struct blocking_entry
|
||||
{
|
||||
nano::account account{ 0 };
|
||||
nano::block_hash dependency{ 0 };
|
||||
priority_entry original_entry{ 0, 0 };
|
||||
|
||||
float priority () const
|
||||
{
|
||||
return original_entry.priority;
|
||||
}
|
||||
};
|
||||
|
||||
// clang-format off
|
||||
class tag_account {};
|
||||
class tag_priority {};
|
||||
class tag_sequenced {};
|
||||
class tag_id {};
|
||||
|
||||
// Tracks the ongoing account priorities
|
||||
// This only stores account priorities > 1.0f.
|
||||
using ordered_priorities = boost::multi_index_container<priority_entry,
|
||||
mi::indexed_by<
|
||||
mi::sequenced<mi::tag<tag_sequenced>>,
|
||||
mi::ordered_unique<mi::tag<tag_account>,
|
||||
mi::member<priority_entry, nano::account, &priority_entry::account>>,
|
||||
mi::ordered_non_unique<mi::tag<tag_priority>,
|
||||
mi::member<priority_entry, float, &priority_entry::priority>>,
|
||||
mi::ordered_unique<mi::tag<tag_id>,
|
||||
mi::member<priority_entry, bootstrap_ascending::id_t, &priority_entry::id>>
|
||||
>>;
|
||||
|
||||
// A blocked account is an account that has failed to insert a new block because the source block is not currently present in the ledger
|
||||
// An account is unblocked once it has a block successfully inserted
|
||||
using ordered_blocking = boost::multi_index_container<blocking_entry,
|
||||
mi::indexed_by<
|
||||
mi::sequenced<mi::tag<tag_sequenced>>,
|
||||
mi::ordered_unique<mi::tag<tag_account>,
|
||||
mi::member<blocking_entry, nano::account, &blocking_entry::account>>,
|
||||
mi::ordered_non_unique<mi::tag<tag_priority>,
|
||||
mi::const_mem_fun<blocking_entry, float, &blocking_entry::priority>>
|
||||
>>;
|
||||
// clang-format on
|
||||
|
||||
ordered_priorities priorities;
|
||||
ordered_blocking blocking;
|
||||
|
||||
std::default_random_engine rng;
|
||||
|
||||
private: // TODO: Move into config
|
||||
static std::size_t constexpr consideration_count = 4;
|
||||
static std::size_t constexpr priorities_max = 256 * 1024;
|
||||
static std::size_t constexpr blocking_max = 256 * 1024;
|
||||
static nano::millis_t constexpr cooldown = 3 * 1000;
|
||||
|
||||
public: // Consts
|
||||
static float constexpr priority_initial = 8.0f;
|
||||
static float constexpr priority_increase = 2.0f;
|
||||
static float constexpr priority_decrease = 0.5f;
|
||||
static float constexpr priority_max = 32.0f;
|
||||
static float constexpr priority_cutoff = 1.0f;
|
||||
|
||||
public:
|
||||
using info_t = std::tuple<decltype (blocking), decltype (priorities)>; // <blocking, priorities>
|
||||
info_t info () const;
|
||||
};
|
||||
|
||||
account_sets::info_t info () const;
|
||||
|
||||
private: // Database iterators
|
||||
class database_iterator
|
||||
{
|
||||
public:
|
||||
enum class table_type
|
||||
{
|
||||
account,
|
||||
pending
|
||||
};
|
||||
|
||||
explicit database_iterator (nano::store & store, table_type);
|
||||
nano::account operator* () const;
|
||||
void next (nano::transaction & tx);
|
||||
|
||||
private:
|
||||
nano::store & store;
|
||||
nano::account current{ 0 };
|
||||
const table_type table;
|
||||
};
|
||||
|
||||
class buffered_iterator
|
||||
{
|
||||
public:
|
||||
explicit buffered_iterator (nano::store & store);
|
||||
nano::account operator* () const;
|
||||
nano::account next ();
|
||||
|
||||
private:
|
||||
void fill ();
|
||||
|
||||
private:
|
||||
nano::store & store;
|
||||
std::deque<nano::account> buffer;
|
||||
|
||||
database_iterator accounts_iterator;
|
||||
database_iterator pending_iterator;
|
||||
|
||||
static std::size_t constexpr size = 1024;
|
||||
};
|
||||
|
||||
private:
|
||||
account_sets accounts;
|
||||
buffered_iterator iterator;
|
||||
|
||||
// clang-format off
|
||||
class tag_sequenced {};
|
||||
class tag_id {};
|
||||
class tag_account {};
|
||||
|
||||
using ordered_tags = boost::multi_index_container<async_tag,
|
||||
mi::indexed_by<
|
||||
mi::sequenced<mi::tag<tag_sequenced>>,
|
||||
mi::hashed_unique<mi::tag<tag_id>,
|
||||
mi::member<async_tag, id_t, &async_tag::id>>,
|
||||
mi::hashed_non_unique<mi::tag<tag_account>,
|
||||
mi::member<async_tag, nano::account , &async_tag::account>>
|
||||
>>;
|
||||
// clang-format on
|
||||
ordered_tags tags;
|
||||
|
||||
nano::bandwidth_limiter limiter;
|
||||
// Requests for accounts from database have much lower hitrate and could introduce strain on the network
|
||||
// A separate (lower) limiter ensures that we always reserve resources for querying accounts from priority queue
|
||||
nano::bandwidth_limiter database_limiter;
|
||||
|
||||
bool stopped{ false };
|
||||
mutable nano::mutex mutex;
|
||||
mutable nano::condition_variable condition;
|
||||
std::thread thread;
|
||||
std::thread timeout_thread;
|
||||
|
||||
private: // TODO: Move into config
|
||||
static std::size_t constexpr requests_limit{ 128 };
|
||||
static std::size_t constexpr database_requests_limit{ 1024 };
|
||||
static std::size_t constexpr pull_count{ nano::bootstrap_server::max_blocks };
|
||||
static nano::millis_t constexpr timeout{ 1000 * 3 };
|
||||
};
|
||||
}
|
|
@ -99,6 +99,8 @@ char const * nano::bootstrap_attempt::mode_text ()
|
|||
return "lazy";
|
||||
case nano::bootstrap_mode::wallet_lazy:
|
||||
return "wallet_lazy";
|
||||
case nano::bootstrap_mode::ascending:
|
||||
return "ascending";
|
||||
}
|
||||
return "unknown";
|
||||
}
|
||||
|
|
|
@ -348,7 +348,7 @@ void nano::bulk_pull_server::set_current_end ()
|
|||
connection->node->logger.try_log (boost::str (boost::format ("Bulk pull request for block hash: %1%") % request->start.to_string ()));
|
||||
}
|
||||
|
||||
current = request->start.as_block_hash ();
|
||||
current = ascending () ? connection->node->store.block.successor (transaction, request->start.as_block_hash ()) : request->start.as_block_hash ();
|
||||
include_start = true;
|
||||
}
|
||||
else
|
||||
|
|
|
@ -172,8 +172,8 @@ void nano::bootstrap_connections::connect_client (nano::tcp_endpoint const & end
|
|||
case boost::system::errc::connection_refused:
|
||||
case boost::system::errc::operation_canceled:
|
||||
case boost::system::errc::timed_out:
|
||||
case 995: //Windows The I/O operation has been aborted because of either a thread exit or an application request
|
||||
case 10061: //Windows No connection could be made because the target machine actively refused it
|
||||
case 995: // Windows The I/O operation has been aborted because of either a thread exit or an application request
|
||||
case 10061: // Windows No connection could be made because the target machine actively refused it
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -286,7 +286,7 @@ void nano::bootstrap_connections::populate_connections (bool repeat)
|
|||
// Not many peers respond, need to try to make more connections than we need.
|
||||
for (auto i = 0u; i < delta; i++)
|
||||
{
|
||||
auto endpoint (node.network.bootstrap_peer ());
|
||||
auto endpoint (node.network.bootstrap_peer ()); // Legacy bootstrap is compatible with older version of protocol
|
||||
if (endpoint != nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0) && (node.flags.allow_bootstrap_peers_duplicates || endpoints.find (endpoint) == endpoints.end ()) && !node.network.excluded_peers.check (endpoint))
|
||||
{
|
||||
connect_client (endpoint);
|
||||
|
|
|
@ -94,6 +94,7 @@ void nano::add_node_flag_options (boost::program_options::options_description &
|
|||
("disable_legacy_bootstrap", "Disables legacy bootstrap")
|
||||
("disable_wallet_bootstrap", "Disables wallet lazy bootstrap")
|
||||
("disable_ongoing_bootstrap", "Disable ongoing bootstrap")
|
||||
("disable_ascending_bootstrap", "Disable ascending bootstrap")
|
||||
("disable_rep_crawler", "Disable rep crawler")
|
||||
("disable_request_loop", "Disable request loop")
|
||||
("disable_bootstrap_listener", "Disables bootstrap processing for TCP listener (not including realtime network TCP connections)")
|
||||
|
@ -122,6 +123,7 @@ std::error_code nano::update_flags (nano::node_flags & flags_a, boost::program_o
|
|||
flags_a.disable_legacy_bootstrap = (vm.count ("disable_legacy_bootstrap") > 0);
|
||||
flags_a.disable_wallet_bootstrap = (vm.count ("disable_wallet_bootstrap") > 0);
|
||||
flags_a.disable_ongoing_bootstrap = (vm.count ("disable_ongoing_bootstrap") > 0);
|
||||
flags_a.disable_ascending_bootstrap = (vm.count ("disable_ascending_bootstrap") > 0);
|
||||
flags_a.disable_rep_crawler = (vm.count ("disable_rep_crawler") > 0);
|
||||
flags_a.disable_request_loop = (vm.count ("disable_request_loop") > 0);
|
||||
if (!flags_a.inactive_node)
|
||||
|
@ -213,7 +215,7 @@ bool copy_database (boost::filesystem::path const & data_path, boost::program_op
|
|||
auto & store (node.node->store);
|
||||
if (vm.count ("unchecked_clear"))
|
||||
{
|
||||
node.node->unchecked.clear (store.tx_begin_write ());
|
||||
node.node->unchecked.clear ();
|
||||
}
|
||||
if (vm.count ("clear_send_ids"))
|
||||
{
|
||||
|
@ -491,7 +493,7 @@ std::error_code nano::handle_node_options (boost::program_options::variables_map
|
|||
if (!node.node->init_error ())
|
||||
{
|
||||
auto transaction (node.node->store.tx_begin_write ());
|
||||
node.node->unchecked.clear (transaction);
|
||||
node.node->unchecked.clear ();
|
||||
std::cout << "Unchecked blocks deleted" << std::endl;
|
||||
}
|
||||
else
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#include <nano/lib/config.hpp>
|
||||
#include <nano/lib/json_error_response.hpp>
|
||||
#include <nano/lib/timer.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_ascending.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_lazy.hpp>
|
||||
#include <nano/node/common.hpp>
|
||||
#include <nano/node/election.hpp>
|
||||
|
@ -1422,7 +1423,7 @@ void nano::json_handler::block_account ()
|
|||
void nano::json_handler::block_count ()
|
||||
{
|
||||
response_l.put ("count", std::to_string (node.ledger.cache.block_count));
|
||||
response_l.put ("unchecked", std::to_string (node.unchecked.count (node.store.tx_begin_read ())));
|
||||
response_l.put ("unchecked", std::to_string (node.unchecked.count ()));
|
||||
response_l.put ("cemented", std::to_string (node.ledger.cache.cemented_count));
|
||||
if (node.flags.enable_pruning)
|
||||
{
|
||||
|
@ -4115,7 +4116,7 @@ void nano::json_handler::unchecked ()
|
|||
boost::property_tree::ptree unchecked;
|
||||
auto transaction (node.store.tx_begin_read ());
|
||||
node.unchecked.for_each (
|
||||
transaction, [&unchecked, &json_block_l] (nano::unchecked_key const & key, nano::unchecked_info const & info) {
|
||||
[&unchecked, &json_block_l] (nano::unchecked_key const & key, nano::unchecked_info const & info) {
|
||||
if (json_block_l)
|
||||
{
|
||||
boost::property_tree::ptree block_node_l;
|
||||
|
@ -4137,7 +4138,7 @@ void nano::json_handler::unchecked_clear ()
|
|||
{
|
||||
node.workers.push_task (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
|
||||
auto transaction (rpc_l->node.store.tx_begin_write ({ tables::unchecked }));
|
||||
rpc_l->node.unchecked.clear (transaction);
|
||||
rpc_l->node.unchecked.clear ();
|
||||
rpc_l->response_l.put ("success", "");
|
||||
rpc_l->response_errors ();
|
||||
}));
|
||||
|
@ -4151,7 +4152,7 @@ void nano::json_handler::unchecked_get ()
|
|||
{
|
||||
bool done = false;
|
||||
node.unchecked.for_each (
|
||||
node.store.tx_begin_read (), [&] (nano::unchecked_key const & key, nano::unchecked_info const & info) {
|
||||
[&] (nano::unchecked_key const & key, nano::unchecked_info const & info) {
|
||||
if (key.hash == hash)
|
||||
{
|
||||
response_l.put ("modified_timestamp", std::to_string (info.modified ()));
|
||||
|
@ -4196,7 +4197,8 @@ void nano::json_handler::unchecked_keys ()
|
|||
boost::property_tree::ptree unchecked;
|
||||
auto transaction (node.store.tx_begin_read ());
|
||||
node.unchecked.for_each (
|
||||
transaction, key, [&unchecked, json_block_l] (nano::unchecked_key const & key, nano::unchecked_info const & info) {
|
||||
key,
|
||||
[&unchecked, json_block_l] (nano::unchecked_key const & key, nano::unchecked_info const & info) {
|
||||
boost::property_tree::ptree entry;
|
||||
entry.put ("key", key.key ().to_string ());
|
||||
entry.put ("hash", info.block->hash ().to_string ());
|
||||
|
@ -5219,6 +5221,40 @@ void nano::json_handler::populate_backlog ()
|
|||
response_errors ();
|
||||
}
|
||||
|
||||
void nano::json_handler::debug_bootstrap_priority_info ()
|
||||
{
|
||||
if (!ec)
|
||||
{
|
||||
auto [blocking, priorities] = node.ascendboot.info ();
|
||||
|
||||
// priorities
|
||||
{
|
||||
boost::property_tree::ptree response_priorities;
|
||||
for (auto const & entry : priorities)
|
||||
{
|
||||
const auto account = entry.account;
|
||||
const auto priority = entry.priority;
|
||||
|
||||
response_priorities.put (account.to_account (), priority);
|
||||
}
|
||||
response_l.add_child ("priorities", response_priorities);
|
||||
}
|
||||
// blocking
|
||||
{
|
||||
boost::property_tree::ptree response_blocking;
|
||||
for (auto const & entry : blocking)
|
||||
{
|
||||
const auto account = entry.account;
|
||||
const auto dependency = entry.dependency;
|
||||
|
||||
response_blocking.put (account.to_account (), dependency.to_string ());
|
||||
}
|
||||
response_l.add_child ("blocking", response_blocking);
|
||||
}
|
||||
}
|
||||
response_errors ();
|
||||
}
|
||||
|
||||
void nano::inprocess_rpc_handler::process_request (std::string const &, std::string const & body_a, std::function<void (std::string const &)> response_a)
|
||||
{
|
||||
// Note that if the rpc action is async, the shared_ptr<json_handler> lifetime will be extended by the action handler
|
||||
|
@ -5384,6 +5420,7 @@ ipc_json_handler_no_arg_func_map create_ipc_json_handler_no_arg_func_map ()
|
|||
no_arg_funcs.emplace ("work_peers", &nano::json_handler::work_peers);
|
||||
no_arg_funcs.emplace ("work_peers_clear", &nano::json_handler::work_peers_clear);
|
||||
no_arg_funcs.emplace ("populate_backlog", &nano::json_handler::populate_backlog);
|
||||
no_arg_funcs.emplace ("debug_bootstrap_priority_info", &nano::json_handler::debug_bootstrap_priority_info);
|
||||
return no_arg_funcs;
|
||||
}
|
||||
|
||||
|
|
|
@ -65,6 +65,7 @@ public:
|
|||
void confirmation_info ();
|
||||
void confirmation_quorum ();
|
||||
void confirmation_height_currently_processing ();
|
||||
void debug_bootstrap_priority_info ();
|
||||
void database_txn_tracker ();
|
||||
void delegators ();
|
||||
void delegators_count ();
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
#include <nano/crypto_lib/random_pool_shuffle.hpp>
|
||||
#include <nano/lib/threading.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_ascending.hpp>
|
||||
#include <nano/node/network.hpp>
|
||||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/telemetry.hpp>
|
||||
|
@ -511,7 +512,7 @@ public:
|
|||
|
||||
void asc_pull_ack (nano::asc_pull_ack const & message) override
|
||||
{
|
||||
// TODO: Process in ascending bootstrap client
|
||||
node.ascendboot.process (message);
|
||||
}
|
||||
|
||||
private:
|
||||
|
|
|
@ -111,7 +111,7 @@ public:
|
|||
void random_fill (std::array<nano::endpoint, 8> &) const;
|
||||
void fill_keepalive_self (std::array<nano::endpoint, 8> &) const;
|
||||
// Note: The minimum protocol version is used after the random selection, so number of peers can be less than expected.
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (std::size_t, uint8_t = 0, bool = false) const;
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (std::size_t count, uint8_t min_version = 0, bool include_temporary_channels = false) const;
|
||||
// Get the next peer for attempting a tcp bootstrap connection
|
||||
nano::tcp_endpoint bootstrap_peer ();
|
||||
nano::endpoint endpoint () const;
|
||||
|
|
|
@ -201,6 +201,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
|
|||
aggregator (config, stats, generator, final_generator, history, ledger, wallets, active),
|
||||
wallets (wallets_store.init_error (), *this),
|
||||
backlog{ nano::backlog_population_config (config), store, stats },
|
||||
ascendboot{ *this, store, block_processor, ledger, network, stats },
|
||||
websocket{ config.websocket_config, observers, wallets, ledger, io_ctx, logger },
|
||||
epoch_upgrader{ *this, ledger, store, network_params, logger },
|
||||
startup_time (std::chrono::steady_clock::now ()),
|
||||
|
@ -214,10 +215,9 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
|
|||
block_publisher.connect (block_processor);
|
||||
gap_tracker.connect (block_processor);
|
||||
process_live_dispatcher.connect (block_processor);
|
||||
unchecked.use_memory = [this] () { return ledger.bootstrap_weight_reached (); };
|
||||
unchecked.satisfied = [this] (nano::unchecked_info const & info) {
|
||||
unchecked.satisfied.add ([this] (nano::unchecked_info const & info) {
|
||||
this->block_processor.add (info.block);
|
||||
};
|
||||
});
|
||||
|
||||
inactive_vote_cache.rep_weight_query = [this] (nano::account const & rep) {
|
||||
return ledger.weight (rep);
|
||||
|
@ -442,7 +442,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
|
|||
if (!flags.disable_unchecked_drop && !use_bootstrap_weight && !flags.read_only)
|
||||
{
|
||||
auto const transaction (store.tx_begin_write ({ tables::unchecked }));
|
||||
unchecked.clear (transaction);
|
||||
unchecked.clear ();
|
||||
logger.always_log ("Dropping unchecked blocks");
|
||||
}
|
||||
}
|
||||
|
@ -585,6 +585,8 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
|
|||
composite->add_component (node.inactive_vote_cache.collect_container_info ("inactive_vote_cache"));
|
||||
composite->add_component (collect_container_info (node.generator, "vote_generator"));
|
||||
composite->add_component (collect_container_info (node.final_generator, "vote_generator_final"));
|
||||
composite->add_component (node.ascendboot.collect_container_info ("bootstrap_ascending"));
|
||||
composite->add_component (node.unchecked.collect_container_info ("unchecked"));
|
||||
return composite;
|
||||
}
|
||||
|
||||
|
@ -696,6 +698,10 @@ void nano::node::start ()
|
|||
backlog.start ();
|
||||
hinting.start ();
|
||||
bootstrap_server.start ();
|
||||
if (!flags.disable_ascending_bootstrap)
|
||||
{
|
||||
ascendboot.start ();
|
||||
}
|
||||
websocket.start ();
|
||||
telemetry.start ();
|
||||
}
|
||||
|
@ -714,6 +720,10 @@ void nano::node::stop ()
|
|||
// No tasks may wait for work generation in I/O threads, or termination signal capturing will be unable to call node::stop()
|
||||
distributed_work.stop ();
|
||||
backlog.stop ();
|
||||
if (!flags.disable_ascending_bootstrap)
|
||||
{
|
||||
ascendboot.stop ();
|
||||
}
|
||||
unchecked.stop ();
|
||||
block_processor.stop ();
|
||||
aggregator.stop ();
|
||||
|
@ -971,7 +981,7 @@ void nano::node::unchecked_cleanup ()
|
|||
auto const transaction (store.tx_begin_read ());
|
||||
// Max 1M records to clean, max 2 minutes reading to prevent slow i/o systems issues
|
||||
unchecked.for_each (
|
||||
transaction, [this, &digests, &cleaning_list, &now] (nano::unchecked_key const & key, nano::unchecked_info const & info) {
|
||||
[this, &digests, &cleaning_list, &now] (nano::unchecked_key const & key, nano::unchecked_info const & info) {
|
||||
if ((now - info.modified ()) > static_cast<uint64_t> (config.unchecked_cutoff_time.count ()))
|
||||
{
|
||||
digests.push_back (network.publish_filter.hash (info.block));
|
||||
|
@ -991,9 +1001,9 @@ void nano::node::unchecked_cleanup ()
|
|||
{
|
||||
auto key (cleaning_list.front ());
|
||||
cleaning_list.pop_front ();
|
||||
if (unchecked.exists (transaction, key))
|
||||
if (unchecked.exists (key))
|
||||
{
|
||||
unchecked.del (transaction, key);
|
||||
unchecked.del (key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1497,7 +1507,7 @@ nano::telemetry_data nano::node::local_telemetry () const
|
|||
telemetry_data.bandwidth_cap = config.bandwidth_limit;
|
||||
telemetry_data.protocol_version = network_params.network.protocol_version;
|
||||
telemetry_data.uptime = std::chrono::duration_cast<std::chrono::seconds> (std::chrono::steady_clock::now () - startup_time).count ();
|
||||
telemetry_data.unchecked_count = unchecked.count (ledger.store.tx_begin_read ());
|
||||
telemetry_data.unchecked_count = unchecked.count ();
|
||||
telemetry_data.genesis_block = network_params.ledger.genesis->hash ();
|
||||
telemetry_data.peer_count = nano::narrow_cast<decltype (telemetry_data.peer_count)> (network.size ());
|
||||
telemetry_data.account_count = ledger.cache.account_count;
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
#include <nano/node/block_publisher.hpp>
|
||||
#include <nano/node/blockprocessor.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_ascending.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_attempt.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_server.hpp>
|
||||
#include <nano/node/confirmation_height_processor.hpp>
|
||||
|
@ -192,6 +193,7 @@ public:
|
|||
nano::request_aggregator aggregator;
|
||||
nano::wallets wallets;
|
||||
nano::backlog_population backlog;
|
||||
nano::bootstrap_ascending ascendboot;
|
||||
nano::websocket_server websocket;
|
||||
nano::epoch_upgrader epoch_upgrader;
|
||||
nano::block_broadcast block_broadcast;
|
||||
|
|
|
@ -101,8 +101,8 @@ public:
|
|||
std::size_t bandwidth_limit{ 10 * 1024 * 1024 };
|
||||
/** By default, allow bursts of 15MB/s (not sustainable) */
|
||||
double bandwidth_limit_burst_ratio{ 3. };
|
||||
/** Default boostrap outbound traffic limit is 16MB/s ~ 128Mbit/s */
|
||||
std::size_t bootstrap_bandwidth_limit{ 16 * 1024 * 1024 };
|
||||
/** Default boostrap outbound traffic limit is 5MB/s */
|
||||
std::size_t bootstrap_bandwidth_limit{ 5 * 1024 * 1024 };
|
||||
/** Bootstrap traffic does not need bursts */
|
||||
double bootstrap_bandwidth_burst_ratio{ 1. };
|
||||
std::chrono::milliseconds conf_height_processor_batch_min_time{ 50 };
|
||||
|
@ -140,6 +140,7 @@ public:
|
|||
bool disable_bootstrap_bulk_pull_server{ false };
|
||||
bool disable_bootstrap_bulk_push_client{ false };
|
||||
bool disable_ongoing_bootstrap{ false }; // For testing only
|
||||
bool disable_ascending_bootstrap{ false };
|
||||
bool disable_rep_crawler{ false };
|
||||
bool disable_request_loop{ false }; // For testing only
|
||||
bool disable_tcp_realtime{ false };
|
||||
|
|
|
@ -189,6 +189,11 @@ std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::transport::t
|
|||
auto index (nano::random_pool::generate_word32 (0, static_cast<CryptoPP::word32> (peers_size - 1)));
|
||||
|
||||
auto channel = channels.get<random_access_tag> ()[index].channel;
|
||||
if (!channel->alive ())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (channel->get_network_version () >= min_version && (include_temporary_channels_a || !channel->temporary))
|
||||
{
|
||||
result.insert (channel);
|
||||
|
|
|
@ -22,113 +22,66 @@ nano::unchecked_map::~unchecked_map ()
|
|||
|
||||
void nano::unchecked_map::put (nano::hash_or_account const & dependency, nano::unchecked_info const & info)
|
||||
{
|
||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||
buffer.push_back (std::make_pair (dependency, info));
|
||||
lock.unlock ();
|
||||
nano::lock_guard<std::recursive_mutex> lock{ entries_mutex };
|
||||
nano::unchecked_key key{ dependency, info.block->hash () };
|
||||
entries.get<tag_root> ().insert ({ key, info });
|
||||
if (entries.size () > mem_block_count_max)
|
||||
{
|
||||
entries.get<tag_sequenced> ().pop_front ();
|
||||
}
|
||||
stats.inc (nano::stat::type::unchecked, nano::stat::detail::put);
|
||||
condition.notify_all (); // Notify run ()
|
||||
}
|
||||
|
||||
void nano::unchecked_map::for_each (
|
||||
nano::transaction const & transaction, std::function<void (nano::unchecked_key const &, nano::unchecked_info const &)> action, std::function<bool ()> predicate)
|
||||
void nano::unchecked_map::for_each (std::function<void (nano::unchecked_key const &, nano::unchecked_info const &)> action, std::function<bool ()> predicate)
|
||||
{
|
||||
nano::lock_guard<std::recursive_mutex> lock{ entries_mutex };
|
||||
if (entries == nullptr)
|
||||
for (auto i = entries.begin (), n = entries.end (); predicate () && i != n; ++i)
|
||||
{
|
||||
for (auto [i, n] = store.unchecked.full_range (transaction); predicate () && i != n; ++i)
|
||||
{
|
||||
action (i->first, i->second);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (auto i = entries->begin (), n = entries->end (); predicate () && i != n; ++i)
|
||||
{
|
||||
action (i->key, i->info);
|
||||
}
|
||||
action (i->key, i->info);
|
||||
}
|
||||
}
|
||||
|
||||
void nano::unchecked_map::for_each (
|
||||
nano::transaction const & transaction, nano::hash_or_account const & dependency, std::function<void (nano::unchecked_key const &, nano::unchecked_info const &)> action, std::function<bool ()> predicate)
|
||||
void nano::unchecked_map::for_each (nano::hash_or_account const & dependency, std::function<void (nano::unchecked_key const &, nano::unchecked_info const &)> action, std::function<bool ()> predicate)
|
||||
{
|
||||
nano::lock_guard<std::recursive_mutex> lock{ entries_mutex };
|
||||
if (entries == nullptr)
|
||||
for (auto i = entries.template get<tag_root> ().lower_bound (nano::unchecked_key{ dependency, 0 }), n = entries.template get<tag_root> ().end (); predicate () && i != n && i->key.key () == dependency.as_block_hash (); ++i)
|
||||
{
|
||||
for (auto [i, n] = store.unchecked.equal_range (transaction, dependency.as_block_hash ()); predicate () && i->first.key () == dependency.as_block_hash () && i != n; ++i)
|
||||
{
|
||||
action (i->first, i->second);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (auto i = entries->template get<tag_root> ().lower_bound (nano::unchecked_key{ dependency, 0 }), n = entries->template get<tag_root> ().end (); predicate () && i != n && i->key.key () == dependency.as_block_hash (); ++i)
|
||||
{
|
||||
action (i->key, i->info);
|
||||
}
|
||||
action (i->key, i->info);
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<nano::unchecked_info> nano::unchecked_map::get (nano::transaction const & transaction, nano::block_hash const & hash)
|
||||
std::vector<nano::unchecked_info> nano::unchecked_map::get (nano::block_hash const & hash)
|
||||
{
|
||||
std::vector<nano::unchecked_info> result;
|
||||
for_each (transaction, hash, [&result] (nano::unchecked_key const & key, nano::unchecked_info const & info) {
|
||||
for_each (hash, [&result] (nano::unchecked_key const & key, nano::unchecked_info const & info) {
|
||||
result.push_back (info);
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
bool nano::unchecked_map::exists (nano::transaction const & transaction, nano::unchecked_key const & key) const
|
||||
bool nano::unchecked_map::exists (nano::unchecked_key const & key) const
|
||||
{
|
||||
nano::lock_guard<std::recursive_mutex> lock{ entries_mutex };
|
||||
if (entries == nullptr)
|
||||
{
|
||||
return store.unchecked.exists (transaction, key);
|
||||
}
|
||||
else
|
||||
{
|
||||
return entries->template get<tag_root> ().count (key) != 0;
|
||||
}
|
||||
return entries.get<tag_root> ().count (key) != 0;
|
||||
}
|
||||
|
||||
void nano::unchecked_map::del (nano::write_transaction const & transaction, nano::unchecked_key const & key)
|
||||
void nano::unchecked_map::del (nano::unchecked_key const & key)
|
||||
{
|
||||
nano::lock_guard<std::recursive_mutex> lock{ entries_mutex };
|
||||
if (entries == nullptr)
|
||||
{
|
||||
store.unchecked.del (transaction, key);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto erased = entries->template get<tag_root> ().erase (key);
|
||||
release_assert (erased);
|
||||
}
|
||||
auto erased = entries.get<tag_root> ().erase (key);
|
||||
debug_assert (erased);
|
||||
}
|
||||
|
||||
void nano::unchecked_map::clear (nano::write_transaction const & transaction)
|
||||
void nano::unchecked_map::clear ()
|
||||
{
|
||||
nano::lock_guard<std::recursive_mutex> lock{ entries_mutex };
|
||||
if (entries == nullptr)
|
||||
{
|
||||
store.unchecked.clear (transaction);
|
||||
}
|
||||
else
|
||||
{
|
||||
entries->clear ();
|
||||
}
|
||||
entries.clear ();
|
||||
}
|
||||
|
||||
size_t nano::unchecked_map::count (nano::transaction const & transaction) const
|
||||
std::size_t nano::unchecked_map::count () const
|
||||
{
|
||||
nano::lock_guard<std::recursive_mutex> lock{ entries_mutex };
|
||||
if (entries == nullptr)
|
||||
{
|
||||
return store.unchecked.count (transaction);
|
||||
}
|
||||
else
|
||||
{
|
||||
return entries->size ();
|
||||
}
|
||||
return entries.size ();
|
||||
}
|
||||
|
||||
void nano::unchecked_map::stop ()
|
||||
|
@ -152,37 +105,18 @@ void nano::unchecked_map::flush ()
|
|||
void nano::unchecked_map::trigger (nano::hash_or_account const & dependency)
|
||||
{
|
||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||
buffer.push_back (dependency);
|
||||
debug_assert (buffer.back ().which () == 1); // which stands for "query".
|
||||
buffer.emplace_back (dependency);
|
||||
lock.unlock ();
|
||||
stats.inc (nano::stat::type::unchecked, nano::stat::detail::trigger);
|
||||
condition.notify_all (); // Notify run ()
|
||||
stats.inc (nano::stat::type::unchecked, nano::stat::detail::trigger);
|
||||
}
|
||||
|
||||
nano::unchecked_map::item_visitor::item_visitor (unchecked_map & unchecked, nano::write_transaction const & transaction) :
|
||||
unchecked{ unchecked },
|
||||
transaction{ transaction }
|
||||
void nano::unchecked_map::process_queries (decltype (buffer) const & back_buffer)
|
||||
{
|
||||
}
|
||||
|
||||
void nano::unchecked_map::item_visitor::operator() (insert const & item)
|
||||
{
|
||||
auto const & [dependency, info] = item;
|
||||
unchecked.insert_impl (transaction, dependency, info);
|
||||
}
|
||||
|
||||
void nano::unchecked_map::item_visitor::operator() (query const & item)
|
||||
{
|
||||
unchecked.query_impl (transaction, item.hash);
|
||||
}
|
||||
|
||||
void nano::unchecked_map::write_buffer (decltype (buffer) const & back_buffer)
|
||||
{
|
||||
auto transaction = store.tx_begin_write ();
|
||||
item_visitor visitor{ *this, transaction };
|
||||
for (auto const & item : back_buffer)
|
||||
{
|
||||
boost::apply_visitor (visitor, item);
|
||||
query_impl (item.hash);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -197,7 +131,7 @@ void nano::unchecked_map::run ()
|
|||
back_buffer.swap (buffer);
|
||||
writing_back_buffer = true;
|
||||
lock.unlock ();
|
||||
write_buffer (back_buffer);
|
||||
process_queries (back_buffer);
|
||||
lock.lock ();
|
||||
writing_back_buffer = false;
|
||||
back_buffer.clear ();
|
||||
|
@ -212,54 +146,29 @@ void nano::unchecked_map::run ()
|
|||
}
|
||||
}
|
||||
|
||||
void nano::unchecked_map::insert_impl (nano::write_transaction const & transaction, nano::hash_or_account const & dependency, nano::unchecked_info const & info)
|
||||
void nano::unchecked_map::query_impl (nano::block_hash const & hash)
|
||||
{
|
||||
// Check if block dependency has been satisfied while waiting to be placed in the unchecked map
|
||||
if (store.block.exists (transaction, dependency.as_block_hash ()))
|
||||
{
|
||||
satisfied (info);
|
||||
return;
|
||||
}
|
||||
nano::lock_guard<std::recursive_mutex> lock{ entries_mutex };
|
||||
// Check if we should be using memory but the memory container hasn't been constructed i.e. we're transitioning from disk to memory.
|
||||
if (entries == nullptr && use_memory ())
|
||||
{
|
||||
auto entries_new = std::make_unique<typename decltype (entries)::element_type> ();
|
||||
for_each (
|
||||
transaction, [&entries_new] (nano::unchecked_key const & key, nano::unchecked_info const & info) { entries_new->template get<tag_root> ().insert ({ key, info }); }, [&] () { return entries_new->size () < mem_block_count_max; });
|
||||
clear (transaction);
|
||||
entries = std::move (entries_new);
|
||||
}
|
||||
if (entries == nullptr)
|
||||
{
|
||||
store.unchecked.put (transaction, dependency, info);
|
||||
}
|
||||
else
|
||||
{
|
||||
nano::unchecked_key key{ dependency, info.block->hash () };
|
||||
entries->template get<tag_root> ().insert ({ key, info });
|
||||
while (entries->size () > mem_block_count_max)
|
||||
{
|
||||
entries->template get<tag_sequenced> ().pop_front ();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void nano::unchecked_map::query_impl (nano::write_transaction const & transaction, nano::block_hash const & hash)
|
||||
{
|
||||
nano::lock_guard<std::recursive_mutex> lock{ entries_mutex };
|
||||
|
||||
std::deque<nano::unchecked_key> delete_queue;
|
||||
for_each (transaction, hash, [this, &delete_queue] (nano::unchecked_key const & key, nano::unchecked_info const & info) {
|
||||
for_each (hash, [this, &delete_queue] (nano::unchecked_key const & key, nano::unchecked_info const & info) {
|
||||
delete_queue.push_back (key);
|
||||
stats.inc (nano::stat::type::unchecked, nano::stat::detail::satisfied);
|
||||
satisfied (info);
|
||||
satisfied.notify (info);
|
||||
});
|
||||
if (!disable_delete)
|
||||
{
|
||||
for (auto const & key : delete_queue)
|
||||
{
|
||||
del (transaction, key);
|
||||
del (key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<nano::container_info_component> nano::unchecked_map::collect_container_info (const std::string & name)
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
|
||||
auto composite = std::make_unique<container_info_composite> (name);
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "entries", entries.size (), sizeof (decltype (entries)::value_type) }));
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "queries", buffer.size (), sizeof (decltype (buffer)::value_type) }));
|
||||
return composite;
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
#include <nano/lib/locks.hpp>
|
||||
#include <nano/lib/numbers.hpp>
|
||||
#include <nano/lib/observer_set.hpp>
|
||||
#include <nano/secure/store.hpp>
|
||||
|
||||
#include <boost/multi_index/member.hpp>
|
||||
|
@ -31,60 +32,50 @@ public:
|
|||
|
||||
void put (nano::hash_or_account const & dependency, nano::unchecked_info const & info);
|
||||
void for_each (
|
||||
nano::transaction const & transaction, std::function<void (nano::unchecked_key const &, nano::unchecked_info const &)> action, std::function<bool ()> predicate = [] () { return true; });
|
||||
std::function<void (nano::unchecked_key const &, nano::unchecked_info const &)> action, std::function<bool ()> predicate = [] () { return true; });
|
||||
void for_each (
|
||||
nano::transaction const & transaction, nano::hash_or_account const & dependency, std::function<void (nano::unchecked_key const &, nano::unchecked_info const &)> action, std::function<bool ()> predicate = [] () { return true; });
|
||||
std::vector<nano::unchecked_info> get (nano::transaction const &, nano::block_hash const &);
|
||||
bool exists (nano::transaction const & transaction, nano::unchecked_key const & key) const;
|
||||
void del (nano::write_transaction const & transaction, nano::unchecked_key const & key);
|
||||
void clear (nano::write_transaction const & transaction);
|
||||
size_t count (nano::transaction const & transaction) const;
|
||||
nano::hash_or_account const & dependency, std::function<void (nano::unchecked_key const &, nano::unchecked_info const &)> action, std::function<bool ()> predicate = [] () { return true; });
|
||||
std::vector<nano::unchecked_info> get (nano::block_hash const &);
|
||||
bool exists (nano::unchecked_key const & key) const;
|
||||
void del (nano::unchecked_key const & key);
|
||||
void clear ();
|
||||
std::size_t count () const;
|
||||
void stop ();
|
||||
void flush ();
|
||||
|
||||
std::function<bool ()> use_memory = [] () { return true; };
|
||||
|
||||
public: // Trigger requested dependencies
|
||||
/**
|
||||
* Trigger requested dependencies
|
||||
*/
|
||||
void trigger (nano::hash_or_account const & dependency);
|
||||
std::function<void (nano::unchecked_info const &)> satisfied{ [] (nano::unchecked_info const &) {} };
|
||||
|
||||
public: // Events
|
||||
nano::observer_set<nano::unchecked_info const &> satisfied;
|
||||
|
||||
private:
|
||||
using insert = std::pair<nano::hash_or_account, nano::unchecked_info>;
|
||||
using query = nano::hash_or_account;
|
||||
class item_visitor : boost::static_visitor<>
|
||||
{
|
||||
public:
|
||||
item_visitor (unchecked_map & unchecked, nano::write_transaction const & transaction);
|
||||
void operator() (insert const & item);
|
||||
void operator() (query const & item);
|
||||
unchecked_map & unchecked;
|
||||
nano::write_transaction const & transaction;
|
||||
};
|
||||
void run ();
|
||||
void insert_impl (nano::write_transaction const & transaction, nano::hash_or_account const & dependency, nano::unchecked_info const & info);
|
||||
void query_impl (nano::write_transaction const & transaction, nano::block_hash const & hash);
|
||||
void query_impl (nano::block_hash const & hash);
|
||||
|
||||
private: // Dependencies
|
||||
nano::store & store;
|
||||
nano::stats & stats;
|
||||
|
||||
private:
|
||||
bool const & disable_delete;
|
||||
std::deque<boost::variant<insert, query>> buffer;
|
||||
std::deque<boost::variant<insert, query>> back_buffer;
|
||||
std::deque<nano::hash_or_account> buffer;
|
||||
std::deque<nano::hash_or_account> back_buffer;
|
||||
bool writing_back_buffer{ false };
|
||||
bool stopped{ false };
|
||||
nano::condition_variable condition;
|
||||
nano::mutex mutex;
|
||||
std::thread thread;
|
||||
void write_buffer (decltype (buffer) const & back_buffer);
|
||||
|
||||
static size_t constexpr mem_block_count_max = 256'000;
|
||||
void process_queries (decltype (buffer) const & back_buffer);
|
||||
|
||||
friend class item_visitor;
|
||||
static std::size_t constexpr mem_block_count_max = 64 * 1024;
|
||||
|
||||
private: // In memory store
|
||||
class entry
|
||||
private:
|
||||
struct entry
|
||||
{
|
||||
public:
|
||||
nano::unchecked_key key;
|
||||
nano::unchecked_info info;
|
||||
};
|
||||
|
@ -99,8 +90,11 @@ private: // In memory store
|
|||
mi::ordered_unique<mi::tag<tag_root>,
|
||||
mi::member<entry, nano::unchecked_key, &entry::key>>>>;
|
||||
// clang-format on
|
||||
std::unique_ptr<ordered_unchecked> entries;
|
||||
ordered_unchecked entries;
|
||||
|
||||
mutable std::recursive_mutex entries_mutex;
|
||||
|
||||
public: // Container info
|
||||
std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name);
|
||||
};
|
||||
}
|
||||
|
|
|
@ -923,7 +923,7 @@ std::string nano_qt::status::text ()
|
|||
std::string count_string;
|
||||
{
|
||||
auto size (wallet.wallet_m->wallets.node.ledger.cache.block_count.load ());
|
||||
unchecked = wallet.wallet_m->wallets.node.unchecked.count (wallet.wallet_m->wallets.node.store.tx_begin_read ());
|
||||
unchecked = wallet.wallet_m->wallets.node.unchecked.count ();
|
||||
cemented = wallet.wallet_m->wallets.node.ledger.cache.cemented_count.load ();
|
||||
count_string = std::to_string (size);
|
||||
}
|
||||
|
|
|
@ -153,6 +153,7 @@ std::unordered_set<std::string> create_rpc_control_impls ()
|
|||
set.emplace ("account_remove");
|
||||
set.emplace ("account_representative_set");
|
||||
set.emplace ("accounts_create");
|
||||
set.emplace ("backoff_info");
|
||||
set.emplace ("block_create");
|
||||
set.emplace ("bootstrap_lazy");
|
||||
set.emplace ("confirmation_height_currently_processing");
|
||||
|
|
|
@ -6094,7 +6094,7 @@ TEST (rpc, unchecked)
|
|||
node->process_active (open);
|
||||
node->process_active (open2);
|
||||
// Waits for the last block of the queue to get saved in the database
|
||||
ASSERT_TIMELY (10s, 2 == node->unchecked.count (node->store.tx_begin_read ()));
|
||||
ASSERT_TIMELY (10s, 2 == node->unchecked.count ());
|
||||
boost::property_tree::ptree request;
|
||||
request.put ("action", "unchecked");
|
||||
request.put ("count", 2);
|
||||
|
@ -6135,7 +6135,7 @@ TEST (rpc, unchecked_get)
|
|||
.build_shared ();
|
||||
node->process_active (open);
|
||||
// Waits for the open block to get saved in the database
|
||||
ASSERT_TIMELY (10s, 1 == node->unchecked.count (node->store.tx_begin_read ()));
|
||||
ASSERT_TIMELY (10s, 1 == node->unchecked.count ());
|
||||
boost::property_tree::ptree request{};
|
||||
request.put ("action", "unchecked_get");
|
||||
request.put ("hash", open->hash ().to_string ());
|
||||
|
@ -6175,12 +6175,12 @@ TEST (rpc, unchecked_clear)
|
|||
node->process_active (open);
|
||||
boost::property_tree::ptree request{};
|
||||
// Waits for the open block to get saved in the database
|
||||
ASSERT_TIMELY (10s, 1 == node->unchecked.count (node->store.tx_begin_read ()));
|
||||
ASSERT_TIMELY (10s, 1 == node->unchecked.count ());
|
||||
request.put ("action", "unchecked_clear");
|
||||
auto response = wait_response (system, rpc_ctx, request);
|
||||
|
||||
// Waits for the open block to get saved in the database
|
||||
ASSERT_TIMELY (10s, 0 == node->unchecked.count (node->store.tx_begin_read ()));
|
||||
ASSERT_TIMELY (10s, 0 == node->unchecked.count ());
|
||||
}
|
||||
|
||||
TEST (rpc, unopened)
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
add_executable(slow_test entry.cpp node.cpp vote_cache.cpp vote_processor.cpp)
|
||||
add_executable(slow_test entry.cpp node.cpp vote_cache.cpp vote_processor.cpp
|
||||
bootstrap.cpp)
|
||||
|
||||
target_link_libraries(slow_test secure node test_common gtest
|
||||
libminiupnpc-static)
|
||||
|
|
200
nano/slow_test/bootstrap.cpp
Normal file
200
nano/slow_test/bootstrap.cpp
Normal file
|
@ -0,0 +1,200 @@
|
|||
#include <nano/lib/rpcconfig.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_ascending.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_server.hpp>
|
||||
#include <nano/node/ipc/ipc_server.hpp>
|
||||
#include <nano/node/json_handler.hpp>
|
||||
#include <nano/node/transport/transport.hpp>
|
||||
#include <nano/rpc/rpc.hpp>
|
||||
#include <nano/rpc/rpc_request_processor.hpp>
|
||||
#include <nano/test_common/network.hpp>
|
||||
#include <nano/test_common/rate_observer.hpp>
|
||||
#include <nano/test_common/system.hpp>
|
||||
#include <nano/test_common/testutil.hpp>
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <boost/format.hpp>
|
||||
|
||||
#include <map>
|
||||
#include <thread>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
namespace
|
||||
{
|
||||
void wait_for_key ()
|
||||
{
|
||||
int junk;
|
||||
std::cin >> junk;
|
||||
}
|
||||
|
||||
class rpc_wrapper
|
||||
{
|
||||
public:
|
||||
rpc_wrapper (nano::test::system & system, nano::node & node, uint16_t port) :
|
||||
node_rpc_config{},
|
||||
rpc_config{ node.network_params.network, port, true },
|
||||
ipc{ node, node_rpc_config },
|
||||
ipc_rpc_processor{ system.io_ctx, rpc_config },
|
||||
rpc{ system.io_ctx, rpc_config, ipc_rpc_processor }
|
||||
{
|
||||
}
|
||||
|
||||
void start ()
|
||||
{
|
||||
rpc.start ();
|
||||
}
|
||||
|
||||
public:
|
||||
nano::node_rpc_config node_rpc_config;
|
||||
nano::rpc_config rpc_config;
|
||||
nano::ipc::ipc_server ipc;
|
||||
nano::ipc_rpc_processor ipc_rpc_processor;
|
||||
nano::rpc rpc;
|
||||
};
|
||||
|
||||
std::unique_ptr<rpc_wrapper> start_rpc (nano::test::system & system, nano::node & node, uint16_t port)
|
||||
{
|
||||
auto rpc = std::make_unique<rpc_wrapper> (system, node, port);
|
||||
rpc->start ();
|
||||
return rpc;
|
||||
}
|
||||
}
|
||||
|
||||
TEST (bootstrap_ascending, profile)
|
||||
{
|
||||
nano::test::system system;
|
||||
nano::thread_runner runner{ system.io_ctx, 2 };
|
||||
nano::networks network = nano::networks::nano_beta_network;
|
||||
nano::network_params network_params{ network };
|
||||
|
||||
// Set up client and server nodes
|
||||
nano::node_config config_server{ network_params };
|
||||
config_server.preconfigured_peers.clear ();
|
||||
config_server.bandwidth_limit = 0; // Unlimited server bandwidth
|
||||
nano::node_flags flags_server;
|
||||
flags_server.disable_legacy_bootstrap = true;
|
||||
flags_server.disable_wallet_bootstrap = true;
|
||||
flags_server.disable_add_initial_peers = true;
|
||||
flags_server.disable_ongoing_bootstrap = true;
|
||||
flags_server.disable_ascending_bootstrap = true;
|
||||
auto data_path_server = nano::working_path (network);
|
||||
//auto data_path_server = "";
|
||||
auto server = std::make_shared<nano::node> (system.io_ctx, data_path_server, config_server, system.work, flags_server);
|
||||
system.nodes.push_back (server);
|
||||
server->start ();
|
||||
|
||||
nano::node_config config_client{ network_params };
|
||||
config_client.preconfigured_peers.clear ();
|
||||
config_client.bandwidth_limit = 0; // Unlimited server bandwidth
|
||||
nano::node_flags flags_client;
|
||||
flags_client.disable_legacy_bootstrap = true;
|
||||
flags_client.disable_wallet_bootstrap = true;
|
||||
flags_client.disable_add_initial_peers = true;
|
||||
flags_client.disable_ongoing_bootstrap = true;
|
||||
config_client.ipc_config.transport_tcp.enabled = true;
|
||||
// Disable database integrity safety for higher throughput
|
||||
config_client.lmdb_config.sync = nano::lmdb_config::sync_strategy::nosync_unsafe;
|
||||
//auto client = system.add_node (config_client, flags_client);
|
||||
|
||||
// macos 16GB RAM disk: diskutil erasevolume HFS+ "RAMDisk" `hdiutil attach -nomount ram://33554432`
|
||||
//auto data_path_client = "/Volumes/RAMDisk";
|
||||
auto data_path_client = nano::unique_path ();
|
||||
auto client = std::make_shared<nano::node> (system.io_ctx, data_path_client, config_client, system.work, flags_client);
|
||||
system.nodes.push_back (client);
|
||||
client->start ();
|
||||
|
||||
// Set up RPC
|
||||
auto client_rpc = start_rpc (system, *server, 55000);
|
||||
auto server_rpc = start_rpc (system, *client, 55001);
|
||||
|
||||
struct entry
|
||||
{
|
||||
nano::bootstrap_ascending::async_tag tag;
|
||||
std::shared_ptr<nano::transport::channel> request_channel;
|
||||
std::shared_ptr<nano::transport::channel> reply_channel;
|
||||
|
||||
bool replied{ false };
|
||||
bool received{ false };
|
||||
};
|
||||
|
||||
nano::mutex mutex;
|
||||
std::unordered_map<uint64_t, entry> requests;
|
||||
|
||||
server->bootstrap_server.on_response.add ([&] (auto & response, auto & channel) {
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
|
||||
if (requests.count (response.id))
|
||||
{
|
||||
requests[response.id].replied = true;
|
||||
requests[response.id].reply_channel = channel;
|
||||
}
|
||||
else
|
||||
{
|
||||
std::cerr << "unknown response: " << response.id << std::endl;
|
||||
}
|
||||
});
|
||||
|
||||
client->ascendboot.on_request.add ([&] (auto & tag, auto & channel) {
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
|
||||
requests[tag.id] = { tag, channel };
|
||||
});
|
||||
|
||||
client->ascendboot.on_reply.add ([&] (auto & tag) {
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
|
||||
requests[tag.id].received = true;
|
||||
});
|
||||
|
||||
/*client->ascendboot.on_timeout.add ([&] (auto & tag) {
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
|
||||
if (requests.count (tag.id))
|
||||
{
|
||||
auto entry = requests[tag.id];
|
||||
|
||||
std::cerr << "timeout: "
|
||||
<< "replied: " << entry.replied
|
||||
<< " | "
|
||||
<< "recevied: " << entry.received
|
||||
<< " | "
|
||||
<< "request: " << entry.request_channel->to_string ()
|
||||
<< " ||| "
|
||||
<< "reply: " << (entry.reply_channel ? entry.reply_channel->to_string () : "null")
|
||||
<< std::endl;
|
||||
}
|
||||
else
|
||||
{
|
||||
std::cerr << "unknown timeout: " << tag.id << std::endl;
|
||||
}
|
||||
});*/
|
||||
|
||||
std::cout << "server count: " << server->ledger.cache.block_count << std::endl;
|
||||
|
||||
nano::test::rate_observer rate;
|
||||
rate.observe ("count", [&] () { return client->ledger.cache.block_count.load (); });
|
||||
rate.observe ("unchecked", [&] () { return client->unchecked.count (); });
|
||||
rate.observe ("block_processor", [&] () { return client->block_processor.size (); });
|
||||
rate.observe ("priority", [&] () { return client->ascendboot.priority_size (); });
|
||||
rate.observe ("blocking", [&] () { return client->ascendboot.blocked_size (); });
|
||||
rate.observe (*client, nano::stat::type::bootstrap_ascending, nano::stat::detail::request, nano::stat::dir::out);
|
||||
rate.observe (*client, nano::stat::type::bootstrap_ascending, nano::stat::detail::reply, nano::stat::dir::in);
|
||||
rate.observe (*client, nano::stat::type::bootstrap_ascending, nano::stat::detail::blocks, nano::stat::dir::in);
|
||||
rate.observe (*server, nano::stat::type::bootstrap_server, nano::stat::detail::blocks, nano::stat::dir::out);
|
||||
rate.observe (*client, nano::stat::type::ledger, nano::stat::detail::old, nano::stat::dir::in);
|
||||
rate.observe (*client, nano::stat::type::ledger, nano::stat::detail::gap_epoch_open_pending, nano::stat::dir::in);
|
||||
rate.observe (*client, nano::stat::type::ledger, nano::stat::detail::gap_source, nano::stat::dir::in);
|
||||
rate.observe (*client, nano::stat::type::ledger, nano::stat::detail::gap_previous, nano::stat::dir::in);
|
||||
rate.background_print (3s);
|
||||
|
||||
//wait_for_key ();
|
||||
while (true)
|
||||
{
|
||||
nano::test::establish_tcp (system, *client, server->network.endpoint ());
|
||||
std::this_thread::sleep_for (10s);
|
||||
}
|
||||
|
||||
server->stop ();
|
||||
client->stop ();
|
||||
}
|
|
@ -510,7 +510,7 @@ TEST (store, unchecked_load)
|
|||
node.unchecked.put (i, block);
|
||||
}
|
||||
// Waits for all the blocks to get saved in the database
|
||||
ASSERT_TIMELY (8000s, num_unchecked == node.unchecked.count (node.store.tx_begin_read ()));
|
||||
ASSERT_TIMELY (8000s, num_unchecked == node.unchecked.count ());
|
||||
}
|
||||
|
||||
TEST (store, vote_load)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue