Merge pull request #4213 from thsfs/bootstrap_ascending_class_files_split
Bootstrap ascending class files split
This commit is contained in:
commit
27a4c83259
17 changed files with 1338 additions and 1265 deletions
|
|
@ -1,5 +1,5 @@
|
||||||
#include <nano/lib/stats.hpp>
|
#include <nano/lib/stats.hpp>
|
||||||
#include <nano/node/bootstrap/bootstrap_ascending.hpp>
|
#include <nano/node/bootstrap_ascending/service.hpp>
|
||||||
#include <nano/test_common/system.hpp>
|
#include <nano/test_common/system.hpp>
|
||||||
#include <nano/test_common/testutil.hpp>
|
#include <nano/test_common/testutil.hpp>
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -34,8 +34,6 @@ add_library(
|
||||||
blockprocessor.cpp
|
blockprocessor.cpp
|
||||||
bootstrap/block_deserializer.hpp
|
bootstrap/block_deserializer.hpp
|
||||||
bootstrap/block_deserializer.cpp
|
bootstrap/block_deserializer.cpp
|
||||||
bootstrap/bootstrap_ascending.hpp
|
|
||||||
bootstrap/bootstrap_ascending.cpp
|
|
||||||
bootstrap/bootstrap_attempt.hpp
|
bootstrap/bootstrap_attempt.hpp
|
||||||
bootstrap/bootstrap_attempt.cpp
|
bootstrap/bootstrap_attempt.cpp
|
||||||
bootstrap/bootstrap_bulk_pull.hpp
|
bootstrap/bootstrap_bulk_pull.hpp
|
||||||
|
|
@ -56,6 +54,15 @@ add_library(
|
||||||
bootstrap/bootstrap.cpp
|
bootstrap/bootstrap.cpp
|
||||||
bootstrap/bootstrap_server.hpp
|
bootstrap/bootstrap_server.hpp
|
||||||
bootstrap/bootstrap_server.cpp
|
bootstrap/bootstrap_server.cpp
|
||||||
|
bootstrap_ascending/common.hpp
|
||||||
|
bootstrap_ascending/throttle.hpp
|
||||||
|
bootstrap_ascending/throttle.cpp
|
||||||
|
bootstrap_ascending/account_sets.hpp
|
||||||
|
bootstrap_ascending/account_sets.cpp
|
||||||
|
bootstrap_ascending/iterators.hpp
|
||||||
|
bootstrap_ascending/iterators.cpp
|
||||||
|
bootstrap_ascending/service.hpp
|
||||||
|
bootstrap_ascending/service.cpp
|
||||||
cli.hpp
|
cli.hpp
|
||||||
cli.cpp
|
cli.cpp
|
||||||
common.hpp
|
common.hpp
|
||||||
|
|
|
||||||
|
|
@ -1,912 +0,0 @@
|
||||||
#include <nano/lib/stats_enums.hpp>
|
|
||||||
#include <nano/node/blockprocessor.hpp>
|
|
||||||
#include <nano/node/bootstrap/block_deserializer.hpp>
|
|
||||||
#include <nano/node/bootstrap/bootstrap_ascending.hpp>
|
|
||||||
#include <nano/node/network.hpp>
|
|
||||||
#include <nano/node/nodeconfig.hpp>
|
|
||||||
#include <nano/node/transport/transport.hpp>
|
|
||||||
#include <nano/secure/common.hpp>
|
|
||||||
#include <nano/secure/ledger.hpp>
|
|
||||||
#include <nano/secure/store.hpp>
|
|
||||||
|
|
||||||
#include <boost/format.hpp>
|
|
||||||
|
|
||||||
using namespace std::chrono_literals;
|
|
||||||
|
|
||||||
nano::bootstrap_ascending::throttle::throttle (size_t count) :
|
|
||||||
successes{ count },
|
|
||||||
samples{ count, true }
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
bool nano::bootstrap_ascending::throttle::throttled () const
|
|
||||||
{
|
|
||||||
return successes == 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::throttle::add (bool sample)
|
|
||||||
{
|
|
||||||
if (samples.front ())
|
|
||||||
{
|
|
||||||
--successes;
|
|
||||||
}
|
|
||||||
samples.push_back (sample);
|
|
||||||
if (sample)
|
|
||||||
{
|
|
||||||
++successes;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* database_iterator
|
|
||||||
*/
|
|
||||||
|
|
||||||
nano::bootstrap_ascending::database_iterator::database_iterator (nano::store & store_a, table_type table_a) :
|
|
||||||
store{ store_a },
|
|
||||||
table{ table_a }
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
nano::account nano::bootstrap_ascending::database_iterator::operator* () const
|
|
||||||
{
|
|
||||||
return current;
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::database_iterator::next (nano::transaction & tx)
|
|
||||||
{
|
|
||||||
switch (table)
|
|
||||||
{
|
|
||||||
case table_type::account:
|
|
||||||
{
|
|
||||||
auto i = current.number () + 1;
|
|
||||||
auto item = store.account.begin (tx, i);
|
|
||||||
if (item != store.account.end ())
|
|
||||||
{
|
|
||||||
current = item->first;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
current = { 0 };
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case table_type::pending:
|
|
||||||
{
|
|
||||||
auto i = current.number () + 1;
|
|
||||||
auto item = store.pending.begin (tx, nano::pending_key{ i, 0 });
|
|
||||||
if (item != store.pending.end ())
|
|
||||||
{
|
|
||||||
current = item->first.account;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
current = { 0 };
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* buffered_iterator
|
|
||||||
*/
|
|
||||||
|
|
||||||
nano::bootstrap_ascending::buffered_iterator::buffered_iterator (nano::store & store_a) :
|
|
||||||
store{ store_a },
|
|
||||||
accounts_iterator{ store, database_iterator::table_type::account },
|
|
||||||
pending_iterator{ store, database_iterator::table_type::pending }
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
nano::account nano::bootstrap_ascending::buffered_iterator::operator* () const
|
|
||||||
{
|
|
||||||
return !buffer.empty () ? buffer.front () : nano::account{ 0 };
|
|
||||||
}
|
|
||||||
|
|
||||||
nano::account nano::bootstrap_ascending::buffered_iterator::next ()
|
|
||||||
{
|
|
||||||
if (!buffer.empty ())
|
|
||||||
{
|
|
||||||
buffer.pop_front ();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
fill ();
|
|
||||||
}
|
|
||||||
|
|
||||||
return *(*this);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool nano::bootstrap_ascending::buffered_iterator::warmup () const
|
|
||||||
{
|
|
||||||
return warmup_m;
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::buffered_iterator::fill ()
|
|
||||||
{
|
|
||||||
debug_assert (buffer.empty ());
|
|
||||||
|
|
||||||
// Fill half from accounts table and half from pending table
|
|
||||||
auto transaction = store.tx_begin_read ();
|
|
||||||
|
|
||||||
for (int n = 0; n < size / 2; ++n)
|
|
||||||
{
|
|
||||||
accounts_iterator.next (transaction);
|
|
||||||
if (!(*accounts_iterator).is_zero ())
|
|
||||||
{
|
|
||||||
buffer.push_back (*accounts_iterator);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int n = 0; n < size / 2; ++n)
|
|
||||||
{
|
|
||||||
pending_iterator.next (transaction);
|
|
||||||
if (!(*pending_iterator).is_zero ())
|
|
||||||
{
|
|
||||||
buffer.push_back (*pending_iterator);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
warmup_m = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* account_sets
|
|
||||||
*/
|
|
||||||
|
|
||||||
nano::bootstrap_ascending::account_sets::account_sets (nano::stats & stats_a, nano::account_sets_config config_a) :
|
|
||||||
stats{ stats_a },
|
|
||||||
config{ std::move (config_a) }
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::account_sets::priority_up (nano::account const & account)
|
|
||||||
{
|
|
||||||
if (!blocked (account))
|
|
||||||
{
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::prioritize);
|
|
||||||
|
|
||||||
auto iter = priorities.get<tag_account> ().find (account);
|
|
||||||
if (iter != priorities.get<tag_account> ().end ())
|
|
||||||
{
|
|
||||||
priorities.get<tag_account> ().modify (iter, [] (auto & val) {
|
|
||||||
val.priority = std::min ((val.priority * account_sets::priority_increase), account_sets::priority_max);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
priorities.get<tag_account> ().insert ({ account, account_sets::priority_initial });
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_insert);
|
|
||||||
|
|
||||||
trim_overflow ();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::prioritize_failed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::account_sets::priority_down (nano::account const & account)
|
|
||||||
{
|
|
||||||
auto iter = priorities.get<tag_account> ().find (account);
|
|
||||||
if (iter != priorities.get<tag_account> ().end ())
|
|
||||||
{
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::deprioritize);
|
|
||||||
|
|
||||||
auto priority_new = iter->priority - account_sets::priority_decrease;
|
|
||||||
if (priority_new <= account_sets::priority_cutoff)
|
|
||||||
{
|
|
||||||
priorities.get<tag_account> ().erase (iter);
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_threshold);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
priorities.get<tag_account> ().modify (iter, [priority_new] (auto & val) {
|
|
||||||
val.priority = priority_new;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::deprioritize_failed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::account_sets::block (nano::account const & account, nano::block_hash const & dependency)
|
|
||||||
{
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::block);
|
|
||||||
|
|
||||||
auto existing = priorities.get<tag_account> ().find (account);
|
|
||||||
auto entry = existing == priorities.get<tag_account> ().end () ? priority_entry{ 0, 0 } : *existing;
|
|
||||||
|
|
||||||
priorities.get<tag_account> ().erase (account);
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_block);
|
|
||||||
|
|
||||||
blocking.get<tag_account> ().insert ({ account, dependency, entry });
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::blocking_insert);
|
|
||||||
|
|
||||||
trim_overflow ();
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::account_sets::unblock (nano::account const & account, std::optional<nano::block_hash> const & hash)
|
|
||||||
{
|
|
||||||
// Unblock only if the dependency is fulfilled
|
|
||||||
auto existing = blocking.get<tag_account> ().find (account);
|
|
||||||
if (existing != blocking.get<tag_account> ().end () && (!hash || existing->dependency == *hash))
|
|
||||||
{
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::unblock);
|
|
||||||
|
|
||||||
debug_assert (priorities.get<tag_account> ().count (account) == 0);
|
|
||||||
if (!existing->original_entry.account.is_zero ())
|
|
||||||
{
|
|
||||||
debug_assert (existing->original_entry.account == account);
|
|
||||||
priorities.get<tag_account> ().insert (existing->original_entry);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
priorities.get<tag_account> ().insert ({ account, account_sets::priority_initial });
|
|
||||||
}
|
|
||||||
blocking.get<tag_account> ().erase (account);
|
|
||||||
|
|
||||||
trim_overflow ();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::unblock_failed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::account_sets::timestamp (const nano::account & account, bool reset)
|
|
||||||
{
|
|
||||||
const nano::millis_t tstamp = reset ? 0 : nano::milliseconds_since_epoch ();
|
|
||||||
|
|
||||||
auto iter = priorities.get<tag_account> ().find (account);
|
|
||||||
if (iter != priorities.get<tag_account> ().end ())
|
|
||||||
{
|
|
||||||
priorities.get<tag_account> ().modify (iter, [tstamp] (auto & entry) {
|
|
||||||
entry.timestamp = tstamp;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool nano::bootstrap_ascending::account_sets::check_timestamp (const nano::account & account) const
|
|
||||||
{
|
|
||||||
auto iter = priorities.get<tag_account> ().find (account);
|
|
||||||
if (iter != priorities.get<tag_account> ().end ())
|
|
||||||
{
|
|
||||||
if (nano::milliseconds_since_epoch () - iter->timestamp < config.cooldown)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::account_sets::trim_overflow ()
|
|
||||||
{
|
|
||||||
if (priorities.size () > config.priorities_max)
|
|
||||||
{
|
|
||||||
// Evict the lowest priority entry
|
|
||||||
priorities.get<tag_priority> ().erase (priorities.get<tag_priority> ().begin ());
|
|
||||||
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_overflow);
|
|
||||||
}
|
|
||||||
if (blocking.size () > config.blocking_max)
|
|
||||||
{
|
|
||||||
// Evict the lowest priority entry
|
|
||||||
blocking.get<tag_priority> ().erase (blocking.get<tag_priority> ().begin ());
|
|
||||||
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::blocking_erase_overflow);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
nano::account nano::bootstrap_ascending::account_sets::next ()
|
|
||||||
{
|
|
||||||
if (priorities.empty ())
|
|
||||||
{
|
|
||||||
return { 0 };
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<float> weights;
|
|
||||||
std::vector<nano::account> candidates;
|
|
||||||
|
|
||||||
int iterations = 0;
|
|
||||||
while (candidates.size () < config.consideration_count && iterations++ < config.consideration_count * 10)
|
|
||||||
{
|
|
||||||
debug_assert (candidates.size () == weights.size ());
|
|
||||||
|
|
||||||
// Use a dedicated, uniformly distributed field for sampling to avoid problematic corner case when accounts in the queue are very close together
|
|
||||||
auto search = bootstrap_ascending::generate_id ();
|
|
||||||
auto iter = priorities.get<tag_id> ().lower_bound (search);
|
|
||||||
if (iter == priorities.get<tag_id> ().end ())
|
|
||||||
{
|
|
||||||
iter = priorities.get<tag_id> ().begin ();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (check_timestamp (iter->account))
|
|
||||||
{
|
|
||||||
candidates.push_back (iter->account);
|
|
||||||
weights.push_back (iter->priority);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (candidates.empty ())
|
|
||||||
{
|
|
||||||
return { 0 }; // All sampled accounts are busy
|
|
||||||
}
|
|
||||||
|
|
||||||
std::discrete_distribution dist{ weights.begin (), weights.end () };
|
|
||||||
auto selection = dist (rng);
|
|
||||||
debug_assert (!weights.empty () && selection < weights.size ());
|
|
||||||
auto result = candidates[selection];
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool nano::bootstrap_ascending::account_sets::blocked (nano::account const & account) const
|
|
||||||
{
|
|
||||||
return blocking.get<tag_account> ().count (account) > 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::size_t nano::bootstrap_ascending::account_sets::priority_size () const
|
|
||||||
{
|
|
||||||
return priorities.size ();
|
|
||||||
}
|
|
||||||
|
|
||||||
std::size_t nano::bootstrap_ascending::account_sets::blocked_size () const
|
|
||||||
{
|
|
||||||
return blocking.size ();
|
|
||||||
}
|
|
||||||
|
|
||||||
float nano::bootstrap_ascending::account_sets::priority (nano::account const & account) const
|
|
||||||
{
|
|
||||||
if (blocked (account))
|
|
||||||
{
|
|
||||||
return 0.0f;
|
|
||||||
}
|
|
||||||
auto existing = priorities.get<tag_account> ().find (account);
|
|
||||||
if (existing != priorities.get<tag_account> ().end ())
|
|
||||||
{
|
|
||||||
return existing->priority;
|
|
||||||
}
|
|
||||||
return account_sets::priority_cutoff;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto nano::bootstrap_ascending::account_sets::info () const -> info_t
|
|
||||||
{
|
|
||||||
return { blocking, priorities };
|
|
||||||
}
|
|
||||||
|
|
||||||
std::unique_ptr<nano::container_info_component> nano::bootstrap_ascending::account_sets::collect_container_info (const std::string & name)
|
|
||||||
{
|
|
||||||
auto composite = std::make_unique<container_info_composite> (name);
|
|
||||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "priorities", priorities.size (), sizeof (decltype (priorities)::value_type) }));
|
|
||||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocking", blocking.size (), sizeof (decltype (blocking)::value_type) }));
|
|
||||||
return composite;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* priority_entry
|
|
||||||
*/
|
|
||||||
|
|
||||||
nano::bootstrap_ascending::account_sets::priority_entry::priority_entry (nano::account account_a, float priority_a) :
|
|
||||||
account{ account_a },
|
|
||||||
priority{ priority_a }
|
|
||||||
{
|
|
||||||
id = nano::bootstrap_ascending::generate_id ();
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* bootstrap_ascending
|
|
||||||
*/
|
|
||||||
|
|
||||||
nano::bootstrap_ascending::bootstrap_ascending (nano::node_config & config_a, nano::block_processor & block_processor_a, nano::ledger & ledger_a, nano::network & network_a, nano::stats & stat_a) :
|
|
||||||
config{ config_a },
|
|
||||||
network_consts{ config.network_params.network },
|
|
||||||
block_processor{ block_processor_a },
|
|
||||||
ledger{ ledger_a },
|
|
||||||
network{ network_a },
|
|
||||||
stats{ stat_a },
|
|
||||||
accounts{ stats },
|
|
||||||
iterator{ ledger.store },
|
|
||||||
throttle{ config.bootstrap_ascending.throttle_count },
|
|
||||||
limiter{ config.bootstrap_ascending.requests_limit, 1.0 },
|
|
||||||
database_limiter{ config.bootstrap_ascending.database_requests_limit, 1.0 }
|
|
||||||
{
|
|
||||||
// TODO: This is called from a very congested blockprocessor thread. Offload this work to a dedicated processing thread
|
|
||||||
block_processor.batch_processed.add ([this] (auto const & batch) {
|
|
||||||
{
|
|
||||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
|
||||||
|
|
||||||
auto transaction = ledger.store.tx_begin_read ();
|
|
||||||
for (auto const & [result, block] : batch)
|
|
||||||
{
|
|
||||||
debug_assert (block != nullptr);
|
|
||||||
|
|
||||||
inspect (transaction, result, *block);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
condition.notify_all ();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
nano::bootstrap_ascending::~bootstrap_ascending ()
|
|
||||||
{
|
|
||||||
// All threads must be stopped before destruction
|
|
||||||
debug_assert (!thread.joinable ());
|
|
||||||
debug_assert (!timeout_thread.joinable ());
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::start ()
|
|
||||||
{
|
|
||||||
debug_assert (!thread.joinable ());
|
|
||||||
debug_assert (!timeout_thread.joinable ());
|
|
||||||
|
|
||||||
thread = std::thread ([this] () {
|
|
||||||
nano::thread_role::set (nano::thread_role::name::ascending_bootstrap);
|
|
||||||
run ();
|
|
||||||
});
|
|
||||||
|
|
||||||
timeout_thread = std::thread ([this] () {
|
|
||||||
nano::thread_role::set (nano::thread_role::name::ascending_bootstrap);
|
|
||||||
run_timeouts ();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::stop ()
|
|
||||||
{
|
|
||||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
|
||||||
stopped = true;
|
|
||||||
lock.unlock ();
|
|
||||||
condition.notify_all ();
|
|
||||||
nano::join_or_pass (thread);
|
|
||||||
nano::join_or_pass (timeout_thread);
|
|
||||||
}
|
|
||||||
|
|
||||||
nano::bootstrap_ascending::id_t nano::bootstrap_ascending::generate_id ()
|
|
||||||
{
|
|
||||||
id_t id;
|
|
||||||
nano::random_pool::generate_block (reinterpret_cast<uint8_t *> (&id), sizeof (id));
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::send (std::shared_ptr<nano::transport::channel> channel, async_tag tag)
|
|
||||||
{
|
|
||||||
debug_assert (tag.type == async_tag::query_type::blocks_by_hash || tag.type == async_tag::query_type::blocks_by_account);
|
|
||||||
|
|
||||||
nano::asc_pull_req request{ network_consts };
|
|
||||||
request.id = tag.id;
|
|
||||||
request.type = nano::asc_pull_type::blocks;
|
|
||||||
|
|
||||||
nano::asc_pull_req::blocks_payload request_payload;
|
|
||||||
request_payload.start = tag.start;
|
|
||||||
request_payload.count = config.bootstrap_ascending.pull_count;
|
|
||||||
request_payload.start_type = (tag.type == async_tag::query_type::blocks_by_hash) ? nano::asc_pull_req::hash_type::block : nano::asc_pull_req::hash_type::account;
|
|
||||||
|
|
||||||
request.payload = request_payload;
|
|
||||||
request.update_header ();
|
|
||||||
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::request, nano::stat::dir::out);
|
|
||||||
|
|
||||||
// TODO: There is no feedback mechanism if bandwidth limiter starts dropping our requests
|
|
||||||
channel->send (
|
|
||||||
request, nullptr,
|
|
||||||
nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap);
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t nano::bootstrap_ascending::priority_size () const
|
|
||||||
{
|
|
||||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
|
||||||
return accounts.priority_size ();
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t nano::bootstrap_ascending::blocked_size () const
|
|
||||||
{
|
|
||||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
|
||||||
return accounts.blocked_size ();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Inspects a block that has been processed by the block processor
|
|
||||||
- Marks an account as blocked if the result code is gap source as there is no reason request additional blocks for this account until the dependency is resolved
|
|
||||||
- Marks an account as forwarded if it has been recently referenced by a block that has been inserted.
|
|
||||||
*/
|
|
||||||
void nano::bootstrap_ascending::inspect (nano::transaction const & tx, nano::process_return const & result, nano::block const & block)
|
|
||||||
{
|
|
||||||
auto const hash = block.hash ();
|
|
||||||
|
|
||||||
switch (result.code)
|
|
||||||
{
|
|
||||||
case nano::process_result::progress:
|
|
||||||
{
|
|
||||||
const auto account = ledger.account (tx, hash);
|
|
||||||
const auto is_send = ledger.is_send (tx, block);
|
|
||||||
|
|
||||||
// If we've inserted any block in to an account, unmark it as blocked
|
|
||||||
accounts.unblock (account);
|
|
||||||
accounts.priority_up (account);
|
|
||||||
accounts.timestamp (account, /* reset timestamp */ true);
|
|
||||||
|
|
||||||
if (is_send)
|
|
||||||
{
|
|
||||||
// TODO: Encapsulate this as a helper somewhere
|
|
||||||
nano::account destination{ 0 };
|
|
||||||
switch (block.type ())
|
|
||||||
{
|
|
||||||
case nano::block_type::send:
|
|
||||||
destination = block.destination ();
|
|
||||||
break;
|
|
||||||
case nano::block_type::state:
|
|
||||||
destination = block.link ().as_account ();
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
debug_assert (false, "unexpected block type");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (!destination.is_zero ())
|
|
||||||
{
|
|
||||||
accounts.unblock (destination, hash); // Unblocking automatically inserts account into priority set
|
|
||||||
accounts.priority_up (destination);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case nano::process_result::gap_source:
|
|
||||||
{
|
|
||||||
const auto account = block.previous ().is_zero () ? block.account () : ledger.account (tx, block.previous ());
|
|
||||||
const auto source = block.source ().is_zero () ? block.link ().as_block_hash () : block.source ();
|
|
||||||
|
|
||||||
// Mark account as blocked because it is missing the source block
|
|
||||||
accounts.block (account, source);
|
|
||||||
|
|
||||||
// TODO: Track stats
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case nano::process_result::old:
|
|
||||||
{
|
|
||||||
// TODO: Track stats
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case nano::process_result::gap_previous:
|
|
||||||
{
|
|
||||||
// TODO: Track stats
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default: // No need to handle other cases
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::wait_blockprocessor ()
|
|
||||||
{
|
|
||||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
|
||||||
while (!stopped && block_processor.half_full ())
|
|
||||||
{
|
|
||||||
condition.wait_for (lock, 500ms, [this] () { return stopped; }); // Blockprocessor is relatively slow, sleeping here instead of using conditions
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::wait_available_request ()
|
|
||||||
{
|
|
||||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
|
||||||
while (!stopped && !limiter.should_pass (1))
|
|
||||||
{
|
|
||||||
condition.wait_for (lock, 50ms, [this] () { return stopped; }); // Give it at least some time to cooldown to avoid hitting the limit too frequently
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
std::shared_ptr<nano::transport::channel> nano::bootstrap_ascending::available_channel ()
|
|
||||||
{
|
|
||||||
auto channels = network.random_set (32, network_consts.bootstrap_protocol_version_min, /* include temporary channels */ true);
|
|
||||||
for (auto & channel : channels)
|
|
||||||
{
|
|
||||||
if (!channel->max (nano::transport::traffic_type::bootstrap))
|
|
||||||
{
|
|
||||||
return channel;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::shared_ptr<nano::transport::channel> nano::bootstrap_ascending::wait_available_channel ()
|
|
||||||
{
|
|
||||||
std::shared_ptr<nano::transport::channel> channel;
|
|
||||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
|
||||||
while (!stopped && !(channel = available_channel ()))
|
|
||||||
{
|
|
||||||
condition.wait_for (lock, 100ms, [this] () { return stopped; });
|
|
||||||
}
|
|
||||||
return channel;
|
|
||||||
}
|
|
||||||
|
|
||||||
nano::account nano::bootstrap_ascending::available_account ()
|
|
||||||
{
|
|
||||||
{
|
|
||||||
auto account = accounts.next ();
|
|
||||||
if (!account.is_zero ())
|
|
||||||
{
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_priority);
|
|
||||||
return account;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (database_limiter.should_pass (1))
|
|
||||||
{
|
|
||||||
auto account = iterator.next ();
|
|
||||||
if (!account.is_zero ())
|
|
||||||
{
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_database);
|
|
||||||
return account;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_none);
|
|
||||||
return { 0 };
|
|
||||||
}
|
|
||||||
|
|
||||||
nano::account nano::bootstrap_ascending::wait_available_account ()
|
|
||||||
{
|
|
||||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
|
||||||
while (!stopped)
|
|
||||||
{
|
|
||||||
auto account = available_account ();
|
|
||||||
if (!account.is_zero ())
|
|
||||||
{
|
|
||||||
accounts.timestamp (account);
|
|
||||||
return account;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
condition.wait_for (lock, 100ms);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return { 0 };
|
|
||||||
}
|
|
||||||
|
|
||||||
bool nano::bootstrap_ascending::request (nano::account & account, std::shared_ptr<nano::transport::channel> & channel)
|
|
||||||
{
|
|
||||||
async_tag tag{};
|
|
||||||
tag.id = generate_id ();
|
|
||||||
tag.account = account;
|
|
||||||
tag.time = nano::milliseconds_since_epoch ();
|
|
||||||
|
|
||||||
// Check if the account picked has blocks, if it does, start the pull from the highest block
|
|
||||||
auto info = ledger.store.account.get (ledger.store.tx_begin_read (), account);
|
|
||||||
if (info)
|
|
||||||
{
|
|
||||||
tag.type = async_tag::query_type::blocks_by_hash;
|
|
||||||
tag.start = info->head;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
tag.type = async_tag::query_type::blocks_by_account;
|
|
||||||
tag.start = account;
|
|
||||||
}
|
|
||||||
|
|
||||||
on_request.notify (tag, channel);
|
|
||||||
|
|
||||||
track (tag);
|
|
||||||
send (channel, tag);
|
|
||||||
|
|
||||||
return true; // Request sent
|
|
||||||
}
|
|
||||||
|
|
||||||
bool nano::bootstrap_ascending::run_one ()
|
|
||||||
{
|
|
||||||
// Ensure there is enough space in blockprocessor for queuing new blocks
|
|
||||||
wait_blockprocessor ();
|
|
||||||
|
|
||||||
// Do not do too many requests in parallel, impose throttling
|
|
||||||
wait_available_request ();
|
|
||||||
|
|
||||||
// Waits for channel that is not full
|
|
||||||
auto channel = wait_available_channel ();
|
|
||||||
if (!channel)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Waits for account either from priority queue or database
|
|
||||||
auto account = wait_available_account ();
|
|
||||||
if (account.is_zero ())
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool success = request (account, channel);
|
|
||||||
return success;
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::throttle_if_needed (nano::unique_lock<nano::mutex> & lock)
|
|
||||||
{
|
|
||||||
debug_assert (lock.owns_lock ());
|
|
||||||
if (!iterator.warmup () && throttle.throttled ())
|
|
||||||
{
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::throttled);
|
|
||||||
condition.wait_for (lock, std::chrono::milliseconds{ config.bootstrap_ascending.throttle_wait }, [this] () { return stopped; });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::run ()
|
|
||||||
{
|
|
||||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
|
||||||
while (!stopped)
|
|
||||||
{
|
|
||||||
lock.unlock ();
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop);
|
|
||||||
run_one ();
|
|
||||||
lock.lock ();
|
|
||||||
throttle_if_needed (lock);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::run_timeouts ()
|
|
||||||
{
|
|
||||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
|
||||||
while (!stopped)
|
|
||||||
{
|
|
||||||
auto & tags_by_order = tags.get<tag_sequenced> ();
|
|
||||||
while (!tags_by_order.empty () && nano::time_difference (tags_by_order.front ().time, nano::milliseconds_since_epoch ()) > config.bootstrap_ascending.timeout)
|
|
||||||
{
|
|
||||||
auto tag = tags_by_order.front ();
|
|
||||||
tags_by_order.pop_front ();
|
|
||||||
on_timeout.notify (tag);
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::timeout);
|
|
||||||
}
|
|
||||||
condition.wait_for (lock, 1s, [this] () { return stopped; });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::process (const nano::asc_pull_ack & message)
|
|
||||||
{
|
|
||||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
|
||||||
|
|
||||||
// Only process messages that have a known tag
|
|
||||||
auto & tags_by_id = tags.get<tag_id> ();
|
|
||||||
if (tags_by_id.count (message.id) > 0)
|
|
||||||
{
|
|
||||||
auto iterator = tags_by_id.find (message.id);
|
|
||||||
auto tag = *iterator;
|
|
||||||
tags_by_id.erase (iterator);
|
|
||||||
|
|
||||||
lock.unlock ();
|
|
||||||
|
|
||||||
on_reply.notify (tag);
|
|
||||||
condition.notify_all ();
|
|
||||||
std::visit ([this, &tag] (auto && request) { return process (request, tag); }, message.payload);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::missing_tag);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::process (const nano::asc_pull_ack::blocks_payload & response, const nano::bootstrap_ascending::async_tag & tag)
|
|
||||||
{
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::reply);
|
|
||||||
|
|
||||||
auto result = verify (response, tag);
|
|
||||||
switch (result)
|
|
||||||
{
|
|
||||||
case verify_result::ok:
|
|
||||||
{
|
|
||||||
stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::blocks, nano::stat::dir::in, response.blocks.size ());
|
|
||||||
|
|
||||||
for (auto & block : response.blocks)
|
|
||||||
{
|
|
||||||
block_processor.add (block);
|
|
||||||
}
|
|
||||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
|
||||||
throttle.add (true);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case verify_result::nothing_new:
|
|
||||||
{
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::nothing_new);
|
|
||||||
|
|
||||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
|
||||||
accounts.priority_down (tag.account);
|
|
||||||
throttle.add (false);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case verify_result::invalid:
|
|
||||||
{
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::invalid);
|
|
||||||
// TODO: Log
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::process (const nano::asc_pull_ack::account_info_payload & response, const nano::bootstrap_ascending::async_tag & tag)
|
|
||||||
{
|
|
||||||
// TODO: Make use of account info
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::process (const nano::empty_payload & response, const nano::bootstrap_ascending::async_tag & tag)
|
|
||||||
{
|
|
||||||
// Should not happen
|
|
||||||
debug_assert (false, "empty payload");
|
|
||||||
}
|
|
||||||
|
|
||||||
nano::bootstrap_ascending::verify_result nano::bootstrap_ascending::verify (const nano::asc_pull_ack::blocks_payload & response, const nano::bootstrap_ascending::async_tag & tag) const
|
|
||||||
{
|
|
||||||
auto const & blocks = response.blocks;
|
|
||||||
|
|
||||||
if (blocks.empty ())
|
|
||||||
{
|
|
||||||
return verify_result::nothing_new;
|
|
||||||
}
|
|
||||||
if (blocks.size () == 1 && blocks.front ()->hash () == tag.start.as_block_hash ())
|
|
||||||
{
|
|
||||||
return verify_result::nothing_new;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto const & first = blocks.front ();
|
|
||||||
switch (tag.type)
|
|
||||||
{
|
|
||||||
case async_tag::query_type::blocks_by_hash:
|
|
||||||
{
|
|
||||||
if (first->hash () != tag.start.as_block_hash ())
|
|
||||||
{
|
|
||||||
// TODO: Stat & log
|
|
||||||
return verify_result::invalid;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case async_tag::query_type::blocks_by_account:
|
|
||||||
{
|
|
||||||
// Open & state blocks always contain account field
|
|
||||||
if (first->account () != tag.start.as_account ())
|
|
||||||
{
|
|
||||||
// TODO: Stat & log
|
|
||||||
return verify_result::invalid;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
return verify_result::invalid;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify blocks make a valid chain
|
|
||||||
nano::block_hash previous_hash = blocks.front ()->hash ();
|
|
||||||
for (int n = 1; n < blocks.size (); ++n)
|
|
||||||
{
|
|
||||||
auto & block = blocks[n];
|
|
||||||
if (block->previous () != previous_hash)
|
|
||||||
{
|
|
||||||
// TODO: Stat & log
|
|
||||||
return verify_result::invalid; // Blocks do not make a chain
|
|
||||||
}
|
|
||||||
previous_hash = block->hash ();
|
|
||||||
}
|
|
||||||
|
|
||||||
return verify_result::ok;
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::bootstrap_ascending::track (async_tag const & tag)
|
|
||||||
{
|
|
||||||
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::track);
|
|
||||||
|
|
||||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
|
||||||
debug_assert (tags.get<tag_id> ().count (tag.id) == 0);
|
|
||||||
tags.get<tag_id> ().insert (tag);
|
|
||||||
}
|
|
||||||
|
|
||||||
auto nano::bootstrap_ascending::info () const -> account_sets::info_t
|
|
||||||
{
|
|
||||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
|
||||||
return accounts.info ();
|
|
||||||
}
|
|
||||||
|
|
||||||
std::unique_ptr<nano::container_info_component> nano::bootstrap_ascending::collect_container_info (std::string const & name)
|
|
||||||
{
|
|
||||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
|
||||||
|
|
||||||
auto composite = std::make_unique<container_info_composite> (name);
|
|
||||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "tags", tags.size (), sizeof (decltype (tags)::value_type) }));
|
|
||||||
composite->add_component (accounts.collect_container_info ("accounts"));
|
|
||||||
return composite;
|
|
||||||
}
|
|
||||||
|
|
@ -1,344 +0,0 @@
|
||||||
#pragma once
|
|
||||||
|
|
||||||
#include <nano/lib/observer_set.hpp>
|
|
||||||
#include <nano/lib/timer.hpp>
|
|
||||||
#include <nano/node/bandwidth_limiter.hpp>
|
|
||||||
#include <nano/node/bootstrap/bootstrap_attempt.hpp>
|
|
||||||
#include <nano/node/bootstrap/bootstrap_config.hpp>
|
|
||||||
#include <nano/node/bootstrap/bootstrap_server.hpp>
|
|
||||||
|
|
||||||
#include <boost/multi_index/hashed_index.hpp>
|
|
||||||
#include <boost/multi_index/mem_fun.hpp>
|
|
||||||
#include <boost/multi_index/member.hpp>
|
|
||||||
#include <boost/multi_index/ordered_index.hpp>
|
|
||||||
#include <boost/multi_index/random_access_index.hpp>
|
|
||||||
#include <boost/multi_index/sequenced_index.hpp>
|
|
||||||
#include <boost/multi_index_container.hpp>
|
|
||||||
|
|
||||||
#include <random>
|
|
||||||
#include <thread>
|
|
||||||
|
|
||||||
namespace mi = boost::multi_index;
|
|
||||||
|
|
||||||
namespace nano
|
|
||||||
{
|
|
||||||
class block_processor;
|
|
||||||
class ledger;
|
|
||||||
class network;
|
|
||||||
class node_config;
|
|
||||||
|
|
||||||
namespace transport
|
|
||||||
{
|
|
||||||
class channel;
|
|
||||||
}
|
|
||||||
|
|
||||||
class bootstrap_ascending
|
|
||||||
{
|
|
||||||
using id_t = uint64_t;
|
|
||||||
|
|
||||||
// Class used to throttle the ascending bootstrapper once it reaches a steady state
|
|
||||||
// Tracks verify_result samples and signals throttling if no tracked samples have gotten results
|
|
||||||
class throttle
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
// Initialized with all true samples
|
|
||||||
explicit throttle (size_t size);
|
|
||||||
bool throttled () const;
|
|
||||||
void add (bool success);
|
|
||||||
|
|
||||||
private:
|
|
||||||
// Rolling count of true samples in the sample buffer
|
|
||||||
size_t successes;
|
|
||||||
// Circular buffer that tracks sample results. True when something was retrieved, false otherwise
|
|
||||||
boost::circular_buffer<bool> samples;
|
|
||||||
};
|
|
||||||
|
|
||||||
public:
|
|
||||||
bootstrap_ascending (nano::node_config &, nano::block_processor &, nano::ledger &, nano::network &, nano::stats &);
|
|
||||||
~bootstrap_ascending ();
|
|
||||||
|
|
||||||
void start ();
|
|
||||||
void stop ();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Process `asc_pull_ack` message coming from network
|
|
||||||
*/
|
|
||||||
void process (nano::asc_pull_ack const & message);
|
|
||||||
|
|
||||||
public: // Container info
|
|
||||||
std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name);
|
|
||||||
size_t blocked_size () const;
|
|
||||||
size_t priority_size () const;
|
|
||||||
|
|
||||||
private: // Dependencies
|
|
||||||
nano::node_config & config;
|
|
||||||
nano::network_constants & network_consts;
|
|
||||||
nano::block_processor & block_processor;
|
|
||||||
nano::ledger & ledger;
|
|
||||||
nano::network & network;
|
|
||||||
nano::stats & stats;
|
|
||||||
|
|
||||||
public: // async_tag
|
|
||||||
struct async_tag
|
|
||||||
{
|
|
||||||
enum class query_type
|
|
||||||
{
|
|
||||||
invalid = 0, // Default initialization
|
|
||||||
blocks_by_hash,
|
|
||||||
blocks_by_account,
|
|
||||||
// TODO: account_info,
|
|
||||||
};
|
|
||||||
|
|
||||||
query_type type{ query_type::invalid };
|
|
||||||
id_t id{ 0 };
|
|
||||||
nano::hash_or_account start{ 0 };
|
|
||||||
nano::millis_t time{ 0 };
|
|
||||||
nano::account account{ 0 };
|
|
||||||
};
|
|
||||||
|
|
||||||
public: // Events
|
|
||||||
nano::observer_set<async_tag const &, std::shared_ptr<nano::transport::channel> &> on_request;
|
|
||||||
nano::observer_set<async_tag const &> on_reply;
|
|
||||||
nano::observer_set<async_tag const &> on_timeout;
|
|
||||||
|
|
||||||
private:
|
|
||||||
/* Inspects a block that has been processed by the block processor */
|
|
||||||
void inspect (nano::transaction const &, nano::process_return const & result, nano::block const & block);
|
|
||||||
|
|
||||||
void throttle_if_needed (nano::unique_lock<nano::mutex> & lock);
|
|
||||||
void run ();
|
|
||||||
bool run_one ();
|
|
||||||
void run_timeouts ();
|
|
||||||
|
|
||||||
/* Limits the number of requests per second we make */
|
|
||||||
void wait_available_request ();
|
|
||||||
/* Throttles requesting new blocks, not to overwhelm blockprocessor */
|
|
||||||
void wait_blockprocessor ();
|
|
||||||
/* Waits for channel with free capacity for bootstrap messages */
|
|
||||||
std::shared_ptr<nano::transport::channel> wait_available_channel ();
|
|
||||||
std::shared_ptr<nano::transport::channel> available_channel ();
|
|
||||||
/* Waits until a suitable account outside of cool down period is available */
|
|
||||||
nano::account available_account ();
|
|
||||||
nano::account wait_available_account ();
|
|
||||||
|
|
||||||
bool request (nano::account &, std::shared_ptr<nano::transport::channel> &);
|
|
||||||
void send (std::shared_ptr<nano::transport::channel>, async_tag tag);
|
|
||||||
void track (async_tag const & tag);
|
|
||||||
|
|
||||||
void process (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag);
|
|
||||||
void process (nano::asc_pull_ack::account_info_payload const & response, async_tag const & tag);
|
|
||||||
void process (nano::empty_payload const & response, async_tag const & tag);
|
|
||||||
|
|
||||||
enum class verify_result
|
|
||||||
{
|
|
||||||
ok,
|
|
||||||
nothing_new,
|
|
||||||
invalid,
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Verifies whether the received response is valid. Returns:
|
|
||||||
* - invalid: when received blocks do not correspond to requested hash/account or they do not make a valid chain
|
|
||||||
* - nothing_new: when received response indicates that the account chain does not have more blocks
|
|
||||||
* - ok: otherwise, if all checks pass
|
|
||||||
*/
|
|
||||||
verify_result verify (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag) const;
|
|
||||||
|
|
||||||
static id_t generate_id ();
|
|
||||||
|
|
||||||
public: // account_sets
|
|
||||||
/** This class tracks accounts various account sets which are shared among the multiple bootstrap threads */
|
|
||||||
class account_sets
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
explicit account_sets (nano::stats &, nano::account_sets_config config = {});
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If an account is not blocked, increase its priority.
|
|
||||||
* If the account does not exist in priority set and is not blocked, inserts a new entry.
|
|
||||||
* Current implementation increases priority by 1.0f each increment
|
|
||||||
*/
|
|
||||||
void priority_up (nano::account const & account);
|
|
||||||
/**
|
|
||||||
* Decreases account priority
|
|
||||||
* Current implementation divides priority by 2.0f and saturates down to 1.0f.
|
|
||||||
*/
|
|
||||||
void priority_down (nano::account const & account);
|
|
||||||
void block (nano::account const & account, nano::block_hash const & dependency);
|
|
||||||
void unblock (nano::account const & account, std::optional<nano::block_hash> const & hash = std::nullopt);
|
|
||||||
void timestamp (nano::account const & account, bool reset = false);
|
|
||||||
|
|
||||||
nano::account next ();
|
|
||||||
|
|
||||||
public:
|
|
||||||
bool blocked (nano::account const & account) const;
|
|
||||||
std::size_t priority_size () const;
|
|
||||||
std::size_t blocked_size () const;
|
|
||||||
/**
|
|
||||||
* Accounts in the ledger but not in priority list are assumed priority 1.0f
|
|
||||||
* Blocked accounts are assumed priority 0.0f
|
|
||||||
*/
|
|
||||||
float priority (nano::account const & account) const;
|
|
||||||
|
|
||||||
public: // Container info
|
|
||||||
std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name);
|
|
||||||
|
|
||||||
private:
|
|
||||||
void trim_overflow ();
|
|
||||||
bool check_timestamp (nano::account const & account) const;
|
|
||||||
|
|
||||||
private: // Dependencies
|
|
||||||
nano::stats & stats;
|
|
||||||
|
|
||||||
private:
|
|
||||||
struct priority_entry
|
|
||||||
{
|
|
||||||
nano::account account{ 0 };
|
|
||||||
float priority{ 0 };
|
|
||||||
nano::millis_t timestamp{ 0 };
|
|
||||||
id_t id{ 0 }; // Uniformly distributed, used for random querying
|
|
||||||
|
|
||||||
priority_entry (nano::account account, float priority);
|
|
||||||
};
|
|
||||||
|
|
||||||
struct blocking_entry
|
|
||||||
{
|
|
||||||
nano::account account{ 0 };
|
|
||||||
nano::block_hash dependency{ 0 };
|
|
||||||
priority_entry original_entry{ 0, 0 };
|
|
||||||
|
|
||||||
float priority () const
|
|
||||||
{
|
|
||||||
return original_entry.priority;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// clang-format off
|
|
||||||
class tag_account {};
|
|
||||||
class tag_priority {};
|
|
||||||
class tag_sequenced {};
|
|
||||||
class tag_id {};
|
|
||||||
|
|
||||||
// Tracks the ongoing account priorities
|
|
||||||
// This only stores account priorities > 1.0f.
|
|
||||||
using ordered_priorities = boost::multi_index_container<priority_entry,
|
|
||||||
mi::indexed_by<
|
|
||||||
mi::sequenced<mi::tag<tag_sequenced>>,
|
|
||||||
mi::ordered_unique<mi::tag<tag_account>,
|
|
||||||
mi::member<priority_entry, nano::account, &priority_entry::account>>,
|
|
||||||
mi::ordered_non_unique<mi::tag<tag_priority>,
|
|
||||||
mi::member<priority_entry, float, &priority_entry::priority>>,
|
|
||||||
mi::ordered_unique<mi::tag<tag_id>,
|
|
||||||
mi::member<priority_entry, bootstrap_ascending::id_t, &priority_entry::id>>
|
|
||||||
>>;
|
|
||||||
|
|
||||||
// A blocked account is an account that has failed to insert a new block because the source block is not currently present in the ledger
|
|
||||||
// An account is unblocked once it has a block successfully inserted
|
|
||||||
using ordered_blocking = boost::multi_index_container<blocking_entry,
|
|
||||||
mi::indexed_by<
|
|
||||||
mi::sequenced<mi::tag<tag_sequenced>>,
|
|
||||||
mi::ordered_unique<mi::tag<tag_account>,
|
|
||||||
mi::member<blocking_entry, nano::account, &blocking_entry::account>>,
|
|
||||||
mi::ordered_non_unique<mi::tag<tag_priority>,
|
|
||||||
mi::const_mem_fun<blocking_entry, float, &blocking_entry::priority>>
|
|
||||||
>>;
|
|
||||||
// clang-format on
|
|
||||||
|
|
||||||
ordered_priorities priorities;
|
|
||||||
ordered_blocking blocking;
|
|
||||||
|
|
||||||
std::default_random_engine rng;
|
|
||||||
|
|
||||||
private:
|
|
||||||
nano::account_sets_config config;
|
|
||||||
|
|
||||||
public: // Consts
|
|
||||||
static float constexpr priority_initial = 8.0f;
|
|
||||||
static float constexpr priority_increase = 2.0f;
|
|
||||||
static float constexpr priority_decrease = 0.5f;
|
|
||||||
static float constexpr priority_max = 32.0f;
|
|
||||||
static float constexpr priority_cutoff = 1.0f;
|
|
||||||
|
|
||||||
public:
|
|
||||||
using info_t = std::tuple<decltype (blocking), decltype (priorities)>; // <blocking, priorities>
|
|
||||||
info_t info () const;
|
|
||||||
};
|
|
||||||
|
|
||||||
account_sets::info_t info () const;
|
|
||||||
|
|
||||||
private: // Database iterators
|
|
||||||
class database_iterator
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
enum class table_type
|
|
||||||
{
|
|
||||||
account,
|
|
||||||
pending
|
|
||||||
};
|
|
||||||
|
|
||||||
explicit database_iterator (nano::store & store, table_type);
|
|
||||||
nano::account operator* () const;
|
|
||||||
void next (nano::transaction & tx);
|
|
||||||
|
|
||||||
private:
|
|
||||||
nano::store & store;
|
|
||||||
nano::account current{ 0 };
|
|
||||||
const table_type table;
|
|
||||||
};
|
|
||||||
|
|
||||||
class buffered_iterator
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
explicit buffered_iterator (nano::store & store);
|
|
||||||
nano::account operator* () const;
|
|
||||||
nano::account next ();
|
|
||||||
// Indicates if a full ledger iteration has taken place e.g. warmed up
|
|
||||||
bool warmup () const;
|
|
||||||
|
|
||||||
private:
|
|
||||||
void fill ();
|
|
||||||
|
|
||||||
private:
|
|
||||||
nano::store & store;
|
|
||||||
std::deque<nano::account> buffer;
|
|
||||||
bool warmup_m{ true };
|
|
||||||
|
|
||||||
database_iterator accounts_iterator;
|
|
||||||
database_iterator pending_iterator;
|
|
||||||
|
|
||||||
static std::size_t constexpr size = 1024;
|
|
||||||
};
|
|
||||||
|
|
||||||
private:
|
|
||||||
account_sets accounts;
|
|
||||||
buffered_iterator iterator;
|
|
||||||
throttle throttle;
|
|
||||||
|
|
||||||
// clang-format off
|
|
||||||
class tag_sequenced {};
|
|
||||||
class tag_id {};
|
|
||||||
class tag_account {};
|
|
||||||
|
|
||||||
using ordered_tags = boost::multi_index_container<async_tag,
|
|
||||||
mi::indexed_by<
|
|
||||||
mi::sequenced<mi::tag<tag_sequenced>>,
|
|
||||||
mi::hashed_unique<mi::tag<tag_id>,
|
|
||||||
mi::member<async_tag, id_t, &async_tag::id>>,
|
|
||||||
mi::hashed_non_unique<mi::tag<tag_account>,
|
|
||||||
mi::member<async_tag, nano::account , &async_tag::account>>
|
|
||||||
>>;
|
|
||||||
// clang-format on
|
|
||||||
ordered_tags tags;
|
|
||||||
|
|
||||||
nano::bandwidth_limiter limiter;
|
|
||||||
// Requests for accounts from database have much lower hitrate and could introduce strain on the network
|
|
||||||
// A separate (lower) limiter ensures that we always reserve resources for querying accounts from priority queue
|
|
||||||
nano::bandwidth_limiter database_limiter;
|
|
||||||
|
|
||||||
bool stopped{ false };
|
|
||||||
mutable nano::mutex mutex;
|
|
||||||
mutable nano::condition_variable condition;
|
|
||||||
std::thread thread;
|
|
||||||
std::thread timeout_thread;
|
|
||||||
};
|
|
||||||
}
|
|
||||||
254
nano/node/bootstrap_ascending/account_sets.cpp
Normal file
254
nano/node/bootstrap_ascending/account_sets.cpp
Normal file
|
|
@ -0,0 +1,254 @@
|
||||||
|
#include <nano/lib/stats.hpp>
|
||||||
|
#include <nano/lib/utility.hpp>
|
||||||
|
#include <nano/node/bootstrap/bootstrap_config.hpp>
|
||||||
|
#include <nano/node/bootstrap_ascending/account_sets.hpp>
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
|
#include <memory>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
/*
|
||||||
|
* account_sets
|
||||||
|
*/
|
||||||
|
|
||||||
|
nano::bootstrap_ascending::account_sets::account_sets (nano::stats & stats_a, nano::account_sets_config config_a) :
|
||||||
|
stats{ stats_a },
|
||||||
|
config{ std::move (config_a) }
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::account_sets::priority_up (nano::account const & account)
|
||||||
|
{
|
||||||
|
if (!blocked (account))
|
||||||
|
{
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::prioritize);
|
||||||
|
|
||||||
|
auto iter = priorities.get<tag_account> ().find (account);
|
||||||
|
if (iter != priorities.get<tag_account> ().end ())
|
||||||
|
{
|
||||||
|
priorities.get<tag_account> ().modify (iter, [] (auto & val) {
|
||||||
|
val.priority = std::min ((val.priority * account_sets::priority_increase), account_sets::priority_max);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
priorities.get<tag_account> ().insert ({ account, account_sets::priority_initial });
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_insert);
|
||||||
|
|
||||||
|
trim_overflow ();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::prioritize_failed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::account_sets::priority_down (nano::account const & account)
|
||||||
|
{
|
||||||
|
auto iter = priorities.get<tag_account> ().find (account);
|
||||||
|
if (iter != priorities.get<tag_account> ().end ())
|
||||||
|
{
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::deprioritize);
|
||||||
|
|
||||||
|
auto priority_new = iter->priority - account_sets::priority_decrease;
|
||||||
|
if (priority_new <= account_sets::priority_cutoff)
|
||||||
|
{
|
||||||
|
priorities.get<tag_account> ().erase (iter);
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_threshold);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
priorities.get<tag_account> ().modify (iter, [priority_new] (auto & val) {
|
||||||
|
val.priority = priority_new;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::deprioritize_failed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::account_sets::block (nano::account const & account, nano::block_hash const & dependency)
|
||||||
|
{
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::block);
|
||||||
|
|
||||||
|
auto existing = priorities.get<tag_account> ().find (account);
|
||||||
|
auto entry = existing == priorities.get<tag_account> ().end () ? priority_entry{ 0, 0 } : *existing;
|
||||||
|
|
||||||
|
priorities.get<tag_account> ().erase (account);
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_block);
|
||||||
|
|
||||||
|
blocking.get<tag_account> ().insert ({ account, dependency, entry });
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::blocking_insert);
|
||||||
|
|
||||||
|
trim_overflow ();
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::account_sets::unblock (nano::account const & account, std::optional<nano::block_hash> const & hash)
|
||||||
|
{
|
||||||
|
// Unblock only if the dependency is fulfilled
|
||||||
|
auto existing = blocking.get<tag_account> ().find (account);
|
||||||
|
if (existing != blocking.get<tag_account> ().end () && (!hash || existing->dependency == *hash))
|
||||||
|
{
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::unblock);
|
||||||
|
|
||||||
|
debug_assert (priorities.get<tag_account> ().count (account) == 0);
|
||||||
|
if (!existing->original_entry.account.is_zero ())
|
||||||
|
{
|
||||||
|
debug_assert (existing->original_entry.account == account);
|
||||||
|
priorities.get<tag_account> ().insert (existing->original_entry);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
priorities.get<tag_account> ().insert ({ account, account_sets::priority_initial });
|
||||||
|
}
|
||||||
|
blocking.get<tag_account> ().erase (account);
|
||||||
|
|
||||||
|
trim_overflow ();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::unblock_failed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::account_sets::timestamp (const nano::account & account, bool reset)
|
||||||
|
{
|
||||||
|
const nano::millis_t tstamp = reset ? 0 : nano::milliseconds_since_epoch ();
|
||||||
|
|
||||||
|
auto iter = priorities.get<tag_account> ().find (account);
|
||||||
|
if (iter != priorities.get<tag_account> ().end ())
|
||||||
|
{
|
||||||
|
priorities.get<tag_account> ().modify (iter, [tstamp] (auto & entry) {
|
||||||
|
entry.timestamp = tstamp;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool nano::bootstrap_ascending::account_sets::check_timestamp (const nano::account & account) const
|
||||||
|
{
|
||||||
|
auto iter = priorities.get<tag_account> ().find (account);
|
||||||
|
if (iter != priorities.get<tag_account> ().end ())
|
||||||
|
{
|
||||||
|
if (nano::milliseconds_since_epoch () - iter->timestamp < config.cooldown)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::account_sets::trim_overflow ()
|
||||||
|
{
|
||||||
|
if (priorities.size () > config.priorities_max)
|
||||||
|
{
|
||||||
|
// Evict the lowest priority entry
|
||||||
|
priorities.get<tag_priority> ().erase (priorities.get<tag_priority> ().begin ());
|
||||||
|
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_overflow);
|
||||||
|
}
|
||||||
|
if (blocking.size () > config.blocking_max)
|
||||||
|
{
|
||||||
|
// Evict the lowest priority entry
|
||||||
|
blocking.get<tag_priority> ().erase (blocking.get<tag_priority> ().begin ());
|
||||||
|
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::blocking_erase_overflow);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
nano::account nano::bootstrap_ascending::account_sets::next ()
|
||||||
|
{
|
||||||
|
if (priorities.empty ())
|
||||||
|
{
|
||||||
|
return { 0 };
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<float> weights;
|
||||||
|
std::vector<nano::account> candidates;
|
||||||
|
|
||||||
|
int iterations = 0;
|
||||||
|
while (candidates.size () < config.consideration_count && iterations++ < config.consideration_count * 10)
|
||||||
|
{
|
||||||
|
debug_assert (candidates.size () == weights.size ());
|
||||||
|
|
||||||
|
// Use a dedicated, uniformly distributed field for sampling to avoid problematic corner case when accounts in the queue are very close together
|
||||||
|
auto search = nano::bootstrap_ascending::generate_id ();
|
||||||
|
auto iter = priorities.get<tag_id> ().lower_bound (search);
|
||||||
|
if (iter == priorities.get<tag_id> ().end ())
|
||||||
|
{
|
||||||
|
iter = priorities.get<tag_id> ().begin ();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (check_timestamp (iter->account))
|
||||||
|
{
|
||||||
|
candidates.push_back (iter->account);
|
||||||
|
weights.push_back (iter->priority);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (candidates.empty ())
|
||||||
|
{
|
||||||
|
return { 0 }; // All sampled accounts are busy
|
||||||
|
}
|
||||||
|
|
||||||
|
std::discrete_distribution dist{ weights.begin (), weights.end () };
|
||||||
|
auto selection = dist (rng);
|
||||||
|
debug_assert (!weights.empty () && selection < weights.size ());
|
||||||
|
auto result = candidates[selection];
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool nano::bootstrap_ascending::account_sets::blocked (nano::account const & account) const
|
||||||
|
{
|
||||||
|
return blocking.get<tag_account> ().count (account) > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::size_t nano::bootstrap_ascending::account_sets::priority_size () const
|
||||||
|
{
|
||||||
|
return priorities.size ();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::size_t nano::bootstrap_ascending::account_sets::blocked_size () const
|
||||||
|
{
|
||||||
|
return blocking.size ();
|
||||||
|
}
|
||||||
|
|
||||||
|
float nano::bootstrap_ascending::account_sets::priority (nano::account const & account) const
|
||||||
|
{
|
||||||
|
if (blocked (account))
|
||||||
|
{
|
||||||
|
return 0.0f;
|
||||||
|
}
|
||||||
|
auto existing = priorities.get<tag_account> ().find (account);
|
||||||
|
if (existing != priorities.get<tag_account> ().end ())
|
||||||
|
{
|
||||||
|
return existing->priority;
|
||||||
|
}
|
||||||
|
return account_sets::priority_cutoff;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto nano::bootstrap_ascending::account_sets::info () const -> nano::bootstrap_ascending::account_sets::info_t
|
||||||
|
{
|
||||||
|
return { blocking, priorities };
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<nano::container_info_component> nano::bootstrap_ascending::account_sets::collect_container_info (const std::string & name)
|
||||||
|
{
|
||||||
|
auto composite = std::make_unique<container_info_composite> (name);
|
||||||
|
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "priorities", priorities.size (), sizeof (decltype (priorities)::value_type) }));
|
||||||
|
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocking", blocking.size (), sizeof (decltype (blocking)::value_type) }));
|
||||||
|
return composite;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* priority_entry
|
||||||
|
*/
|
||||||
|
|
||||||
|
nano::bootstrap_ascending::account_sets::priority_entry::priority_entry (nano::account account_a, float priority_a) :
|
||||||
|
account{ account_a },
|
||||||
|
priority{ priority_a }
|
||||||
|
{
|
||||||
|
id = nano::bootstrap_ascending::generate_id ();
|
||||||
|
}
|
||||||
142
nano/node/bootstrap_ascending/account_sets.hpp
Normal file
142
nano/node/bootstrap_ascending/account_sets.hpp
Normal file
|
|
@ -0,0 +1,142 @@
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <nano/lib/numbers.hpp>
|
||||||
|
#include <nano/node/bootstrap/bootstrap_config.hpp>
|
||||||
|
#include <nano/node/bootstrap_ascending/common.hpp>
|
||||||
|
|
||||||
|
#include <boost/multi_index/hashed_index.hpp>
|
||||||
|
#include <boost/multi_index/mem_fun.hpp>
|
||||||
|
#include <boost/multi_index/member.hpp>
|
||||||
|
#include <boost/multi_index/ordered_index.hpp>
|
||||||
|
#include <boost/multi_index/random_access_index.hpp>
|
||||||
|
#include <boost/multi_index/sequenced_index.hpp>
|
||||||
|
#include <boost/multi_index_container.hpp>
|
||||||
|
|
||||||
|
#include <random>
|
||||||
|
|
||||||
|
namespace mi = boost::multi_index;
|
||||||
|
|
||||||
|
namespace nano
|
||||||
|
{
|
||||||
|
class stats;
|
||||||
|
|
||||||
|
namespace bootstrap_ascending
|
||||||
|
{
|
||||||
|
/** This class tracks accounts various account sets which are shared among the multiple bootstrap threads */
|
||||||
|
class account_sets
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit account_sets (nano::stats &, nano::account_sets_config config = {});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If an account is not blocked, increase its priority.
|
||||||
|
* If the account does not exist in priority set and is not blocked, inserts a new entry.
|
||||||
|
* Current implementation increases priority by 1.0f each increment
|
||||||
|
*/
|
||||||
|
void priority_up (nano::account const & account);
|
||||||
|
/**
|
||||||
|
* Decreases account priority
|
||||||
|
* Current implementation divides priority by 2.0f and saturates down to 1.0f.
|
||||||
|
*/
|
||||||
|
void priority_down (nano::account const & account);
|
||||||
|
void block (nano::account const & account, nano::block_hash const & dependency);
|
||||||
|
void unblock (nano::account const & account, std::optional<nano::block_hash> const & hash = std::nullopt);
|
||||||
|
void timestamp (nano::account const & account, bool reset = false);
|
||||||
|
|
||||||
|
nano::account next ();
|
||||||
|
|
||||||
|
public:
|
||||||
|
bool blocked (nano::account const & account) const;
|
||||||
|
std::size_t priority_size () const;
|
||||||
|
std::size_t blocked_size () const;
|
||||||
|
/**
|
||||||
|
* Accounts in the ledger but not in priority list are assumed priority 1.0f
|
||||||
|
* Blocked accounts are assumed priority 0.0f
|
||||||
|
*/
|
||||||
|
float priority (nano::account const & account) const;
|
||||||
|
|
||||||
|
public: // Container info
|
||||||
|
std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name);
|
||||||
|
|
||||||
|
private:
|
||||||
|
void trim_overflow ();
|
||||||
|
bool check_timestamp (nano::account const & account) const;
|
||||||
|
|
||||||
|
private: // Dependencies
|
||||||
|
nano::stats & stats;
|
||||||
|
|
||||||
|
private:
|
||||||
|
struct priority_entry
|
||||||
|
{
|
||||||
|
nano::account account{ 0 };
|
||||||
|
float priority{ 0 };
|
||||||
|
nano::millis_t timestamp{ 0 };
|
||||||
|
nano::bootstrap_ascending::id_t id{ 0 }; // Uniformly distributed, used for random querying
|
||||||
|
|
||||||
|
priority_entry (nano::account account, float priority);
|
||||||
|
};
|
||||||
|
|
||||||
|
struct blocking_entry
|
||||||
|
{
|
||||||
|
nano::account account{ 0 };
|
||||||
|
nano::block_hash dependency{ 0 };
|
||||||
|
priority_entry original_entry{ 0, 0 };
|
||||||
|
|
||||||
|
float priority () const
|
||||||
|
{
|
||||||
|
return original_entry.priority;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// clang-format off
|
||||||
|
class tag_account {};
|
||||||
|
class tag_priority {};
|
||||||
|
class tag_sequenced {};
|
||||||
|
class tag_id {};
|
||||||
|
|
||||||
|
// Tracks the ongoing account priorities
|
||||||
|
// This only stores account priorities > 1.0f.
|
||||||
|
using ordered_priorities = boost::multi_index_container<priority_entry,
|
||||||
|
mi::indexed_by<
|
||||||
|
mi::sequenced<mi::tag<tag_sequenced>>,
|
||||||
|
mi::ordered_unique<mi::tag<tag_account>,
|
||||||
|
mi::member<priority_entry, nano::account, &priority_entry::account>>,
|
||||||
|
mi::ordered_non_unique<mi::tag<tag_priority>,
|
||||||
|
mi::member<priority_entry, float, &priority_entry::priority>>,
|
||||||
|
mi::ordered_unique<mi::tag<tag_id>,
|
||||||
|
mi::member<priority_entry, nano::bootstrap_ascending::id_t, &priority_entry::id>>
|
||||||
|
>>;
|
||||||
|
|
||||||
|
// A blocked account is an account that has failed to insert a new block because the source block is not currently present in the ledger
|
||||||
|
// An account is unblocked once it has a block successfully inserted
|
||||||
|
using ordered_blocking = boost::multi_index_container<blocking_entry,
|
||||||
|
mi::indexed_by<
|
||||||
|
mi::sequenced<mi::tag<tag_sequenced>>,
|
||||||
|
mi::ordered_unique<mi::tag<tag_account>,
|
||||||
|
mi::member<blocking_entry, nano::account, &blocking_entry::account>>,
|
||||||
|
mi::ordered_non_unique<mi::tag<tag_priority>,
|
||||||
|
mi::const_mem_fun<blocking_entry, float, &blocking_entry::priority>>
|
||||||
|
>>;
|
||||||
|
// clang-format on
|
||||||
|
|
||||||
|
ordered_priorities priorities;
|
||||||
|
ordered_blocking blocking;
|
||||||
|
|
||||||
|
std::default_random_engine rng;
|
||||||
|
|
||||||
|
private:
|
||||||
|
nano::account_sets_config config;
|
||||||
|
|
||||||
|
public: // Consts
|
||||||
|
static float constexpr priority_initial = 8.0f;
|
||||||
|
static float constexpr priority_increase = 2.0f;
|
||||||
|
static float constexpr priority_decrease = 0.5f;
|
||||||
|
static float constexpr priority_max = 32.0f;
|
||||||
|
static float constexpr priority_cutoff = 1.0f;
|
||||||
|
|
||||||
|
public:
|
||||||
|
using info_t = std::tuple<decltype (blocking), decltype (priorities)>; // <blocking, priorities>
|
||||||
|
info_t info () const;
|
||||||
|
};
|
||||||
|
} // bootstrap_ascending
|
||||||
|
} // nano
|
||||||
16
nano/node/bootstrap_ascending/common.hpp
Normal file
16
nano/node/bootstrap_ascending/common.hpp
Normal file
|
|
@ -0,0 +1,16 @@
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <nano/crypto_lib/random_pool.hpp>
|
||||||
|
|
||||||
|
#include <cstdlib>
|
||||||
|
|
||||||
|
namespace nano::bootstrap_ascending
|
||||||
|
{
|
||||||
|
using id_t = uint64_t;
|
||||||
|
static nano::bootstrap_ascending::id_t generate_id ()
|
||||||
|
{
|
||||||
|
nano::bootstrap_ascending::id_t id;
|
||||||
|
nano::random_pool::generate_block (reinterpret_cast<uint8_t *> (&id), sizeof (id));
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
} // nano::bootstrap_ascending
|
||||||
119
nano/node/bootstrap_ascending/iterators.cpp
Normal file
119
nano/node/bootstrap_ascending/iterators.cpp
Normal file
|
|
@ -0,0 +1,119 @@
|
||||||
|
#include <nano/lib/utility.hpp>
|
||||||
|
#include <nano/node/bootstrap_ascending/iterators.hpp>
|
||||||
|
#include <nano/secure/common.hpp>
|
||||||
|
#include <nano/secure/store.hpp>
|
||||||
|
|
||||||
|
/*
|
||||||
|
* database_iterator
|
||||||
|
*/
|
||||||
|
|
||||||
|
nano::bootstrap_ascending::database_iterator::database_iterator (nano::store & store_a, table_type table_a) :
|
||||||
|
store{ store_a },
|
||||||
|
table{ table_a }
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
nano::account nano::bootstrap_ascending::database_iterator::operator* () const
|
||||||
|
{
|
||||||
|
return current;
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::database_iterator::next (nano::transaction & tx)
|
||||||
|
{
|
||||||
|
switch (table)
|
||||||
|
{
|
||||||
|
case table_type::account:
|
||||||
|
{
|
||||||
|
auto i = current.number () + 1;
|
||||||
|
auto item = store.account.begin (tx, i);
|
||||||
|
if (item != store.account.end ())
|
||||||
|
{
|
||||||
|
current = item->first;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
current = { 0 };
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case table_type::pending:
|
||||||
|
{
|
||||||
|
auto i = current.number () + 1;
|
||||||
|
auto item = store.pending.begin (tx, nano::pending_key{ i, 0 });
|
||||||
|
if (item != store.pending.end ())
|
||||||
|
{
|
||||||
|
current = item->first.account;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
current = { 0 };
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* buffered_iterator
|
||||||
|
*/
|
||||||
|
|
||||||
|
nano::bootstrap_ascending::buffered_iterator::buffered_iterator (nano::store & store_a) :
|
||||||
|
store{ store_a },
|
||||||
|
accounts_iterator{ store, database_iterator::table_type::account },
|
||||||
|
pending_iterator{ store, database_iterator::table_type::pending }
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
nano::account nano::bootstrap_ascending::buffered_iterator::operator* () const
|
||||||
|
{
|
||||||
|
return !buffer.empty () ? buffer.front () : nano::account{ 0 };
|
||||||
|
}
|
||||||
|
|
||||||
|
nano::account nano::bootstrap_ascending::buffered_iterator::next ()
|
||||||
|
{
|
||||||
|
if (!buffer.empty ())
|
||||||
|
{
|
||||||
|
buffer.pop_front ();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
fill ();
|
||||||
|
}
|
||||||
|
|
||||||
|
return *(*this);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool nano::bootstrap_ascending::buffered_iterator::warmup () const
|
||||||
|
{
|
||||||
|
return warmup_m;
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::buffered_iterator::fill ()
|
||||||
|
{
|
||||||
|
debug_assert (buffer.empty ());
|
||||||
|
|
||||||
|
// Fill half from accounts table and half from pending table
|
||||||
|
auto transaction = store.tx_begin_read ();
|
||||||
|
|
||||||
|
for (int n = 0; n < size / 2; ++n)
|
||||||
|
{
|
||||||
|
accounts_iterator.next (transaction);
|
||||||
|
if (!(*accounts_iterator).is_zero ())
|
||||||
|
{
|
||||||
|
buffer.push_back (*accounts_iterator);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int n = 0; n < size / 2; ++n)
|
||||||
|
{
|
||||||
|
pending_iterator.next (transaction);
|
||||||
|
if (!(*pending_iterator).is_zero ())
|
||||||
|
{
|
||||||
|
buffer.push_back (*pending_iterator);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
warmup_m = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
56
nano/node/bootstrap_ascending/iterators.hpp
Normal file
56
nano/node/bootstrap_ascending/iterators.hpp
Normal file
|
|
@ -0,0 +1,56 @@
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <nano/lib/numbers.hpp>
|
||||||
|
|
||||||
|
#include <deque>
|
||||||
|
|
||||||
|
namespace nano
|
||||||
|
{
|
||||||
|
class store;
|
||||||
|
class transaction;
|
||||||
|
|
||||||
|
namespace bootstrap_ascending
|
||||||
|
{
|
||||||
|
class database_iterator
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
enum class table_type
|
||||||
|
{
|
||||||
|
account,
|
||||||
|
pending
|
||||||
|
};
|
||||||
|
|
||||||
|
explicit database_iterator (nano::store & store, table_type);
|
||||||
|
nano::account operator* () const;
|
||||||
|
void next (nano::transaction & tx);
|
||||||
|
|
||||||
|
private:
|
||||||
|
nano::store & store;
|
||||||
|
nano::account current{ 0 };
|
||||||
|
const table_type table;
|
||||||
|
};
|
||||||
|
|
||||||
|
class buffered_iterator
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit buffered_iterator (nano::store & store);
|
||||||
|
nano::account operator* () const;
|
||||||
|
nano::account next ();
|
||||||
|
// Indicates if a full ledger iteration has taken place e.g. warmed up
|
||||||
|
bool warmup () const;
|
||||||
|
|
||||||
|
private:
|
||||||
|
void fill ();
|
||||||
|
|
||||||
|
private:
|
||||||
|
nano::store & store;
|
||||||
|
std::deque<nano::account> buffer;
|
||||||
|
bool warmup_m{ true };
|
||||||
|
|
||||||
|
database_iterator accounts_iterator;
|
||||||
|
database_iterator pending_iterator;
|
||||||
|
|
||||||
|
static std::size_t constexpr size = 1024;
|
||||||
|
};
|
||||||
|
} // nano
|
||||||
|
} // bootstrap_ascending
|
||||||
517
nano/node/bootstrap_ascending/service.cpp
Normal file
517
nano/node/bootstrap_ascending/service.cpp
Normal file
|
|
@ -0,0 +1,517 @@
|
||||||
|
#include <nano/lib/stats_enums.hpp>
|
||||||
|
#include <nano/node/blockprocessor.hpp>
|
||||||
|
#include <nano/node/bootstrap_ascending/service.hpp>
|
||||||
|
#include <nano/node/network.hpp>
|
||||||
|
#include <nano/node/nodeconfig.hpp>
|
||||||
|
#include <nano/node/transport/transport.hpp>
|
||||||
|
#include <nano/secure/common.hpp>
|
||||||
|
#include <nano/secure/ledger.hpp>
|
||||||
|
#include <nano/secure/store.hpp>
|
||||||
|
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* bootstrap_ascending
|
||||||
|
*/
|
||||||
|
|
||||||
|
nano::bootstrap_ascending::service::service (nano::node_config & config_a, nano::block_processor & block_processor_a, nano::ledger & ledger_a, nano::network & network_a, nano::stats & stat_a) :
|
||||||
|
config{ config_a },
|
||||||
|
network_consts{ config.network_params.network },
|
||||||
|
block_processor{ block_processor_a },
|
||||||
|
ledger{ ledger_a },
|
||||||
|
network{ network_a },
|
||||||
|
stats{ stat_a },
|
||||||
|
accounts{ stats },
|
||||||
|
iterator{ ledger.store },
|
||||||
|
throttle{ config.bootstrap_ascending.throttle_count },
|
||||||
|
limiter{ config.bootstrap_ascending.requests_limit, 1.0 },
|
||||||
|
database_limiter{ config.bootstrap_ascending.database_requests_limit, 1.0 }
|
||||||
|
{
|
||||||
|
// TODO: This is called from a very congested blockprocessor thread. Offload this work to a dedicated processing thread
|
||||||
|
block_processor.batch_processed.add ([this] (auto const & batch) {
|
||||||
|
{
|
||||||
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||||
|
|
||||||
|
auto transaction = ledger.store.tx_begin_read ();
|
||||||
|
for (auto const & [result, block] : batch)
|
||||||
|
{
|
||||||
|
debug_assert (block != nullptr);
|
||||||
|
|
||||||
|
inspect (transaction, result, *block);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
condition.notify_all ();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
nano::bootstrap_ascending::service::~service ()
|
||||||
|
{
|
||||||
|
// All threads must be stopped before destruction
|
||||||
|
debug_assert (!thread.joinable ());
|
||||||
|
debug_assert (!timeout_thread.joinable ());
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::service::start ()
|
||||||
|
{
|
||||||
|
debug_assert (!thread.joinable ());
|
||||||
|
debug_assert (!timeout_thread.joinable ());
|
||||||
|
|
||||||
|
thread = std::thread ([this] () {
|
||||||
|
nano::thread_role::set (nano::thread_role::name::ascending_bootstrap);
|
||||||
|
run ();
|
||||||
|
});
|
||||||
|
|
||||||
|
timeout_thread = std::thread ([this] () {
|
||||||
|
nano::thread_role::set (nano::thread_role::name::ascending_bootstrap);
|
||||||
|
run_timeouts ();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::service::stop ()
|
||||||
|
{
|
||||||
|
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||||
|
stopped = true;
|
||||||
|
lock.unlock ();
|
||||||
|
condition.notify_all ();
|
||||||
|
nano::join_or_pass (thread);
|
||||||
|
nano::join_or_pass (timeout_thread);
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::service::send (std::shared_ptr<nano::transport::channel> channel, async_tag tag)
|
||||||
|
{
|
||||||
|
debug_assert (tag.type == async_tag::query_type::blocks_by_hash || tag.type == async_tag::query_type::blocks_by_account);
|
||||||
|
|
||||||
|
nano::asc_pull_req request{ network_consts };
|
||||||
|
request.id = tag.id;
|
||||||
|
request.type = nano::asc_pull_type::blocks;
|
||||||
|
|
||||||
|
nano::asc_pull_req::blocks_payload request_payload;
|
||||||
|
request_payload.start = tag.start;
|
||||||
|
request_payload.count = config.bootstrap_ascending.pull_count;
|
||||||
|
request_payload.start_type = (tag.type == async_tag::query_type::blocks_by_hash) ? nano::asc_pull_req::hash_type::block : nano::asc_pull_req::hash_type::account;
|
||||||
|
|
||||||
|
request.payload = request_payload;
|
||||||
|
request.update_header ();
|
||||||
|
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::request, nano::stat::dir::out);
|
||||||
|
|
||||||
|
// TODO: There is no feedback mechanism if bandwidth limiter starts dropping our requests
|
||||||
|
channel->send (
|
||||||
|
request, nullptr,
|
||||||
|
nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t nano::bootstrap_ascending::service::priority_size () const
|
||||||
|
{
|
||||||
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||||
|
return accounts.priority_size ();
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t nano::bootstrap_ascending::service::blocked_size () const
|
||||||
|
{
|
||||||
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||||
|
return accounts.blocked_size ();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Inspects a block that has been processed by the block processor
|
||||||
|
- Marks an account as blocked if the result code is gap source as there is no reason request additional blocks for this account until the dependency is resolved
|
||||||
|
- Marks an account as forwarded if it has been recently referenced by a block that has been inserted.
|
||||||
|
*/
|
||||||
|
void nano::bootstrap_ascending::service::inspect (nano::transaction const & tx, nano::process_return const & result, nano::block const & block)
|
||||||
|
{
|
||||||
|
auto const hash = block.hash ();
|
||||||
|
|
||||||
|
switch (result.code)
|
||||||
|
{
|
||||||
|
case nano::process_result::progress:
|
||||||
|
{
|
||||||
|
const auto account = ledger.account (tx, hash);
|
||||||
|
const auto is_send = ledger.is_send (tx, block);
|
||||||
|
|
||||||
|
// If we've inserted any block in to an account, unmark it as blocked
|
||||||
|
accounts.unblock (account);
|
||||||
|
accounts.priority_up (account);
|
||||||
|
accounts.timestamp (account, /* reset timestamp */ true);
|
||||||
|
|
||||||
|
if (is_send)
|
||||||
|
{
|
||||||
|
// TODO: Encapsulate this as a helper somewhere
|
||||||
|
nano::account destination{ 0 };
|
||||||
|
switch (block.type ())
|
||||||
|
{
|
||||||
|
case nano::block_type::send:
|
||||||
|
destination = block.destination ();
|
||||||
|
break;
|
||||||
|
case nano::block_type::state:
|
||||||
|
destination = block.link ().as_account ();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
debug_assert (false, "unexpected block type");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (!destination.is_zero ())
|
||||||
|
{
|
||||||
|
accounts.unblock (destination, hash); // Unblocking automatically inserts account into priority set
|
||||||
|
accounts.priority_up (destination);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case nano::process_result::gap_source:
|
||||||
|
{
|
||||||
|
const auto account = block.previous ().is_zero () ? block.account () : ledger.account (tx, block.previous ());
|
||||||
|
const auto source = block.source ().is_zero () ? block.link ().as_block_hash () : block.source ();
|
||||||
|
|
||||||
|
// Mark account as blocked because it is missing the source block
|
||||||
|
accounts.block (account, source);
|
||||||
|
|
||||||
|
// TODO: Track stats
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case nano::process_result::old:
|
||||||
|
{
|
||||||
|
// TODO: Track stats
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case nano::process_result::gap_previous:
|
||||||
|
{
|
||||||
|
// TODO: Track stats
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default: // No need to handle other cases
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::service::wait_blockprocessor ()
|
||||||
|
{
|
||||||
|
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||||
|
while (!stopped && block_processor.half_full ())
|
||||||
|
{
|
||||||
|
condition.wait_for (lock, 500ms, [this] () { return stopped; }); // Blockprocessor is relatively slow, sleeping here instead of using conditions
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::service::wait_available_request ()
|
||||||
|
{
|
||||||
|
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||||
|
while (!stopped && !limiter.should_pass (1))
|
||||||
|
{
|
||||||
|
condition.wait_for (lock, 50ms, [this] () { return stopped; }); // Give it at least some time to cooldown to avoid hitting the limit too frequently
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::shared_ptr<nano::transport::channel> nano::bootstrap_ascending::service::available_channel ()
|
||||||
|
{
|
||||||
|
auto channels = network.random_set (32, network_consts.bootstrap_protocol_version_min, /* include temporary channels */ true);
|
||||||
|
for (auto & channel : channels)
|
||||||
|
{
|
||||||
|
if (!channel->max (nano::transport::traffic_type::bootstrap))
|
||||||
|
{
|
||||||
|
return channel;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::shared_ptr<nano::transport::channel> nano::bootstrap_ascending::service::wait_available_channel ()
|
||||||
|
{
|
||||||
|
std::shared_ptr<nano::transport::channel> channel;
|
||||||
|
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||||
|
while (!stopped && !(channel = available_channel ()))
|
||||||
|
{
|
||||||
|
condition.wait_for (lock, 100ms, [this] () { return stopped; });
|
||||||
|
}
|
||||||
|
return channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
nano::account nano::bootstrap_ascending::service::available_account ()
|
||||||
|
{
|
||||||
|
{
|
||||||
|
auto account = accounts.next ();
|
||||||
|
if (!account.is_zero ())
|
||||||
|
{
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_priority);
|
||||||
|
return account;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (database_limiter.should_pass (1))
|
||||||
|
{
|
||||||
|
auto account = iterator.next ();
|
||||||
|
if (!account.is_zero ())
|
||||||
|
{
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_database);
|
||||||
|
return account;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_none);
|
||||||
|
return { 0 };
|
||||||
|
}
|
||||||
|
|
||||||
|
nano::account nano::bootstrap_ascending::service::wait_available_account ()
|
||||||
|
{
|
||||||
|
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||||
|
while (!stopped)
|
||||||
|
{
|
||||||
|
auto account = available_account ();
|
||||||
|
if (!account.is_zero ())
|
||||||
|
{
|
||||||
|
accounts.timestamp (account);
|
||||||
|
return account;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
condition.wait_for (lock, 100ms);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return { 0 };
|
||||||
|
}
|
||||||
|
|
||||||
|
bool nano::bootstrap_ascending::service::request (nano::account & account, std::shared_ptr<nano::transport::channel> & channel)
|
||||||
|
{
|
||||||
|
async_tag tag{};
|
||||||
|
tag.id = nano::bootstrap_ascending::generate_id ();
|
||||||
|
tag.account = account;
|
||||||
|
tag.time = nano::milliseconds_since_epoch ();
|
||||||
|
|
||||||
|
// Check if the account picked has blocks, if it does, start the pull from the highest block
|
||||||
|
auto info = ledger.store.account.get (ledger.store.tx_begin_read (), account);
|
||||||
|
if (info)
|
||||||
|
{
|
||||||
|
tag.type = async_tag::query_type::blocks_by_hash;
|
||||||
|
tag.start = info->head;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
tag.type = async_tag::query_type::blocks_by_account;
|
||||||
|
tag.start = account;
|
||||||
|
}
|
||||||
|
|
||||||
|
on_request.notify (tag, channel);
|
||||||
|
|
||||||
|
track (tag);
|
||||||
|
send (channel, tag);
|
||||||
|
|
||||||
|
return true; // Request sent
|
||||||
|
}
|
||||||
|
|
||||||
|
bool nano::bootstrap_ascending::service::run_one ()
|
||||||
|
{
|
||||||
|
// Ensure there is enough space in blockprocessor for queuing new blocks
|
||||||
|
wait_blockprocessor ();
|
||||||
|
|
||||||
|
// Do not do too many requests in parallel, impose throttling
|
||||||
|
wait_available_request ();
|
||||||
|
|
||||||
|
// Waits for channel that is not full
|
||||||
|
auto channel = wait_available_channel ();
|
||||||
|
if (!channel)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Waits for account either from priority queue or database
|
||||||
|
auto account = wait_available_account ();
|
||||||
|
if (account.is_zero ())
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool success = request (account, channel);
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::service::throttle_if_needed (nano::unique_lock<nano::mutex> & lock)
|
||||||
|
{
|
||||||
|
debug_assert (lock.owns_lock ());
|
||||||
|
if (!iterator.warmup () && throttle.throttled ())
|
||||||
|
{
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::throttled);
|
||||||
|
condition.wait_for (lock, std::chrono::milliseconds{ config.bootstrap_ascending.throttle_wait }, [this] () { return stopped; });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::service::run ()
|
||||||
|
{
|
||||||
|
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||||
|
while (!stopped)
|
||||||
|
{
|
||||||
|
lock.unlock ();
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop);
|
||||||
|
run_one ();
|
||||||
|
lock.lock ();
|
||||||
|
throttle_if_needed (lock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::service::run_timeouts ()
|
||||||
|
{
|
||||||
|
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||||
|
while (!stopped)
|
||||||
|
{
|
||||||
|
auto & tags_by_order = tags.get<tag_sequenced> ();
|
||||||
|
while (!tags_by_order.empty () && nano::time_difference (tags_by_order.front ().time, nano::milliseconds_since_epoch ()) > config.bootstrap_ascending.timeout)
|
||||||
|
{
|
||||||
|
auto tag = tags_by_order.front ();
|
||||||
|
tags_by_order.pop_front ();
|
||||||
|
on_timeout.notify (tag);
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::timeout);
|
||||||
|
}
|
||||||
|
condition.wait_for (lock, 1s, [this] () { return stopped; });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack & message)
|
||||||
|
{
|
||||||
|
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||||
|
|
||||||
|
// Only process messages that have a known tag
|
||||||
|
auto & tags_by_id = tags.get<tag_id> ();
|
||||||
|
if (tags_by_id.count (message.id) > 0)
|
||||||
|
{
|
||||||
|
auto iterator = tags_by_id.find (message.id);
|
||||||
|
auto tag = *iterator;
|
||||||
|
tags_by_id.erase (iterator);
|
||||||
|
|
||||||
|
lock.unlock ();
|
||||||
|
|
||||||
|
on_reply.notify (tag);
|
||||||
|
condition.notify_all ();
|
||||||
|
std::visit ([this, &tag] (auto && request) { return process (request, tag); }, message.payload);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::missing_tag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::blocks_payload & response, const nano::bootstrap_ascending::service::async_tag & tag)
|
||||||
|
{
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::reply);
|
||||||
|
|
||||||
|
auto result = verify (response, tag);
|
||||||
|
switch (result)
|
||||||
|
{
|
||||||
|
case verify_result::ok:
|
||||||
|
{
|
||||||
|
stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::blocks, nano::stat::dir::in, response.blocks.size ());
|
||||||
|
|
||||||
|
for (auto & block : response.blocks)
|
||||||
|
{
|
||||||
|
block_processor.add (block);
|
||||||
|
}
|
||||||
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||||
|
throttle.add (true);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case verify_result::nothing_new:
|
||||||
|
{
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::nothing_new);
|
||||||
|
|
||||||
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||||
|
accounts.priority_down (tag.account);
|
||||||
|
throttle.add (false);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case verify_result::invalid:
|
||||||
|
{
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::invalid);
|
||||||
|
// TODO: Log
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::account_info_payload & response, const nano::bootstrap_ascending::service::async_tag & tag)
|
||||||
|
{
|
||||||
|
// TODO: Make use of account info
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::service::process (const nano::empty_payload & response, const nano::bootstrap_ascending::service::async_tag & tag)
|
||||||
|
{
|
||||||
|
// Should not happen
|
||||||
|
debug_assert (false, "empty payload");
|
||||||
|
}
|
||||||
|
|
||||||
|
nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::service::verify (const nano::asc_pull_ack::blocks_payload & response, const nano::bootstrap_ascending::service::async_tag & tag) const
|
||||||
|
{
|
||||||
|
auto const & blocks = response.blocks;
|
||||||
|
|
||||||
|
if (blocks.empty ())
|
||||||
|
{
|
||||||
|
return verify_result::nothing_new;
|
||||||
|
}
|
||||||
|
if (blocks.size () == 1 && blocks.front ()->hash () == tag.start.as_block_hash ())
|
||||||
|
{
|
||||||
|
return verify_result::nothing_new;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto const & first = blocks.front ();
|
||||||
|
switch (tag.type)
|
||||||
|
{
|
||||||
|
case async_tag::query_type::blocks_by_hash:
|
||||||
|
{
|
||||||
|
if (first->hash () != tag.start.as_block_hash ())
|
||||||
|
{
|
||||||
|
// TODO: Stat & log
|
||||||
|
return verify_result::invalid;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case async_tag::query_type::blocks_by_account:
|
||||||
|
{
|
||||||
|
// Open & state blocks always contain account field
|
||||||
|
if (first->account () != tag.start.as_account ())
|
||||||
|
{
|
||||||
|
// TODO: Stat & log
|
||||||
|
return verify_result::invalid;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
return verify_result::invalid;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify blocks make a valid chain
|
||||||
|
nano::block_hash previous_hash = blocks.front ()->hash ();
|
||||||
|
for (int n = 1; n < blocks.size (); ++n)
|
||||||
|
{
|
||||||
|
auto & block = blocks[n];
|
||||||
|
if (block->previous () != previous_hash)
|
||||||
|
{
|
||||||
|
// TODO: Stat & log
|
||||||
|
return verify_result::invalid; // Blocks do not make a chain
|
||||||
|
}
|
||||||
|
previous_hash = block->hash ();
|
||||||
|
}
|
||||||
|
|
||||||
|
return verify_result::ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::service::track (async_tag const & tag)
|
||||||
|
{
|
||||||
|
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::track);
|
||||||
|
|
||||||
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||||
|
debug_assert (tags.get<tag_id> ().count (tag.id) == 0);
|
||||||
|
tags.get<tag_id> ().insert (tag);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto nano::bootstrap_ascending::service::info () const -> nano::bootstrap_ascending::account_sets::info_t
|
||||||
|
{
|
||||||
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||||
|
return accounts.info ();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<nano::container_info_component> nano::bootstrap_ascending::service::collect_container_info (std::string const & name)
|
||||||
|
{
|
||||||
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||||
|
|
||||||
|
auto composite = std::make_unique<container_info_composite> (name);
|
||||||
|
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "tags", tags.size (), sizeof (decltype (tags)::value_type) }));
|
||||||
|
composite->add_component (accounts.collect_container_info ("accounts"));
|
||||||
|
return composite;
|
||||||
|
}
|
||||||
170
nano/node/bootstrap_ascending/service.hpp
Normal file
170
nano/node/bootstrap_ascending/service.hpp
Normal file
|
|
@ -0,0 +1,170 @@
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <nano/lib/locks.hpp>
|
||||||
|
#include <nano/lib/numbers.hpp>
|
||||||
|
#include <nano/lib/observer_set.hpp>
|
||||||
|
#include <nano/lib/timer.hpp>
|
||||||
|
#include <nano/node/bandwidth_limiter.hpp>
|
||||||
|
#include <nano/node/bootstrap/bootstrap_config.hpp>
|
||||||
|
#include <nano/node/bootstrap_ascending/account_sets.hpp>
|
||||||
|
#include <nano/node/bootstrap_ascending/common.hpp>
|
||||||
|
#include <nano/node/bootstrap_ascending/iterators.hpp>
|
||||||
|
#include <nano/node/bootstrap_ascending/throttle.hpp>
|
||||||
|
|
||||||
|
#include <boost/multi_index/hashed_index.hpp>
|
||||||
|
#include <boost/multi_index/mem_fun.hpp>
|
||||||
|
#include <boost/multi_index/member.hpp>
|
||||||
|
#include <boost/multi_index/ordered_index.hpp>
|
||||||
|
#include <boost/multi_index/random_access_index.hpp>
|
||||||
|
#include <boost/multi_index/sequenced_index.hpp>
|
||||||
|
#include <boost/multi_index_container.hpp>
|
||||||
|
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
|
namespace mi = boost::multi_index;
|
||||||
|
|
||||||
|
namespace nano
|
||||||
|
{
|
||||||
|
class block_processor;
|
||||||
|
class ledger;
|
||||||
|
class network;
|
||||||
|
class node_config;
|
||||||
|
class transaction;
|
||||||
|
|
||||||
|
namespace transport
|
||||||
|
{
|
||||||
|
class channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace bootstrap_ascending
|
||||||
|
{
|
||||||
|
class service
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
service (nano::node_config &, nano::block_processor &, nano::ledger &, nano::network &, nano::stats &);
|
||||||
|
~service ();
|
||||||
|
|
||||||
|
void start ();
|
||||||
|
void stop ();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process `asc_pull_ack` message coming from network
|
||||||
|
*/
|
||||||
|
void process (nano::asc_pull_ack const & message);
|
||||||
|
|
||||||
|
public: // Container info
|
||||||
|
std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name);
|
||||||
|
size_t blocked_size () const;
|
||||||
|
size_t priority_size () const;
|
||||||
|
|
||||||
|
private: // Dependencies
|
||||||
|
nano::node_config & config;
|
||||||
|
nano::network_constants & network_consts;
|
||||||
|
nano::block_processor & block_processor;
|
||||||
|
nano::ledger & ledger;
|
||||||
|
nano::network & network;
|
||||||
|
nano::stats & stats;
|
||||||
|
|
||||||
|
public: // async_tag
|
||||||
|
struct async_tag
|
||||||
|
{
|
||||||
|
enum class query_type
|
||||||
|
{
|
||||||
|
invalid = 0, // Default initialization
|
||||||
|
blocks_by_hash,
|
||||||
|
blocks_by_account,
|
||||||
|
// TODO: account_info,
|
||||||
|
};
|
||||||
|
|
||||||
|
query_type type{ query_type::invalid };
|
||||||
|
nano::bootstrap_ascending::id_t id{ 0 };
|
||||||
|
nano::hash_or_account start{ 0 };
|
||||||
|
nano::millis_t time{ 0 };
|
||||||
|
nano::account account{ 0 };
|
||||||
|
};
|
||||||
|
|
||||||
|
public: // Events
|
||||||
|
nano::observer_set<async_tag const &, std::shared_ptr<nano::transport::channel> &> on_request;
|
||||||
|
nano::observer_set<async_tag const &> on_reply;
|
||||||
|
nano::observer_set<async_tag const &> on_timeout;
|
||||||
|
|
||||||
|
private:
|
||||||
|
/* Inspects a block that has been processed by the block processor */
|
||||||
|
void inspect (nano::transaction const &, nano::process_return const & result, nano::block const & block);
|
||||||
|
|
||||||
|
void throttle_if_needed (nano::unique_lock<nano::mutex> & lock);
|
||||||
|
void run ();
|
||||||
|
bool run_one ();
|
||||||
|
void run_timeouts ();
|
||||||
|
|
||||||
|
/* Limits the number of requests per second we make */
|
||||||
|
void wait_available_request ();
|
||||||
|
/* Throttles requesting new blocks, not to overwhelm blockprocessor */
|
||||||
|
void wait_blockprocessor ();
|
||||||
|
/* Waits for channel with free capacity for bootstrap messages */
|
||||||
|
std::shared_ptr<nano::transport::channel> wait_available_channel ();
|
||||||
|
std::shared_ptr<nano::transport::channel> available_channel ();
|
||||||
|
/* Waits until a suitable account outside of cool down period is available */
|
||||||
|
nano::account available_account ();
|
||||||
|
nano::account wait_available_account ();
|
||||||
|
|
||||||
|
bool request (nano::account &, std::shared_ptr<nano::transport::channel> &);
|
||||||
|
void send (std::shared_ptr<nano::transport::channel>, async_tag tag);
|
||||||
|
void track (async_tag const & tag);
|
||||||
|
|
||||||
|
void process (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag);
|
||||||
|
void process (nano::asc_pull_ack::account_info_payload const & response, async_tag const & tag);
|
||||||
|
void process (nano::empty_payload const & response, async_tag const & tag);
|
||||||
|
|
||||||
|
enum class verify_result
|
||||||
|
{
|
||||||
|
ok,
|
||||||
|
nothing_new,
|
||||||
|
invalid,
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies whether the received response is valid. Returns:
|
||||||
|
* - invalid: when received blocks do not correspond to requested hash/account or they do not make a valid chain
|
||||||
|
* - nothing_new: when received response indicates that the account chain does not have more blocks
|
||||||
|
* - ok: otherwise, if all checks pass
|
||||||
|
*/
|
||||||
|
verify_result verify (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag) const;
|
||||||
|
|
||||||
|
public: // account_sets
|
||||||
|
nano::bootstrap_ascending::account_sets::info_t info () const;
|
||||||
|
|
||||||
|
private:
|
||||||
|
nano::bootstrap_ascending::account_sets accounts;
|
||||||
|
nano::bootstrap_ascending::buffered_iterator iterator;
|
||||||
|
nano::bootstrap_ascending::throttle throttle;
|
||||||
|
|
||||||
|
// clang-format off
|
||||||
|
class tag_sequenced {};
|
||||||
|
class tag_id {};
|
||||||
|
class tag_account {};
|
||||||
|
|
||||||
|
using ordered_tags = boost::multi_index_container<async_tag,
|
||||||
|
mi::indexed_by<
|
||||||
|
mi::sequenced<mi::tag<tag_sequenced>>,
|
||||||
|
mi::hashed_unique<mi::tag<tag_id>,
|
||||||
|
mi::member<async_tag, nano::bootstrap_ascending::id_t, &async_tag::id>>,
|
||||||
|
mi::hashed_non_unique<mi::tag<tag_account>,
|
||||||
|
mi::member<async_tag, nano::account , &async_tag::account>>
|
||||||
|
>>;
|
||||||
|
// clang-format on
|
||||||
|
ordered_tags tags;
|
||||||
|
|
||||||
|
nano::bandwidth_limiter limiter;
|
||||||
|
// Requests for accounts from database have much lower hitrate and could introduce strain on the network
|
||||||
|
// A separate (lower) limiter ensures that we always reserve resources for querying accounts from priority queue
|
||||||
|
nano::bandwidth_limiter database_limiter;
|
||||||
|
|
||||||
|
bool stopped{ false };
|
||||||
|
mutable nano::mutex mutex;
|
||||||
|
mutable nano::condition_variable condition;
|
||||||
|
std::thread thread;
|
||||||
|
std::thread timeout_thread;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
25
nano/node/bootstrap_ascending/throttle.cpp
Normal file
25
nano/node/bootstrap_ascending/throttle.cpp
Normal file
|
|
@ -0,0 +1,25 @@
|
||||||
|
#include <nano/node/bootstrap_ascending/throttle.hpp>
|
||||||
|
|
||||||
|
nano::bootstrap_ascending::throttle::throttle (std::size_t count) :
|
||||||
|
successes{ count },
|
||||||
|
samples{ count, true }
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
bool nano::bootstrap_ascending::throttle::throttled () const
|
||||||
|
{
|
||||||
|
return successes == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::bootstrap_ascending::throttle::add (bool sample)
|
||||||
|
{
|
||||||
|
if (samples.front ())
|
||||||
|
{
|
||||||
|
--successes;
|
||||||
|
}
|
||||||
|
samples.push_back (sample);
|
||||||
|
if (sample)
|
||||||
|
{
|
||||||
|
++successes;
|
||||||
|
}
|
||||||
|
}
|
||||||
23
nano/node/bootstrap_ascending/throttle.hpp
Normal file
23
nano/node/bootstrap_ascending/throttle.hpp
Normal file
|
|
@ -0,0 +1,23 @@
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <boost/circular_buffer.hpp>
|
||||||
|
|
||||||
|
namespace nano::bootstrap_ascending
|
||||||
|
{
|
||||||
|
// Class used to throttle the ascending bootstrapper once it reaches a steady state
|
||||||
|
// Tracks verify_result samples and signals throttling if no tracked samples have gotten results
|
||||||
|
class throttle
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
// Initialized with all true samples
|
||||||
|
explicit throttle (std::size_t size);
|
||||||
|
bool throttled () const;
|
||||||
|
void add (bool success);
|
||||||
|
|
||||||
|
private:
|
||||||
|
// Rolling count of true samples in the sample buffer
|
||||||
|
std::size_t successes;
|
||||||
|
// Circular buffer that tracks sample results. True when something was retrieved, false otherwise
|
||||||
|
boost::circular_buffer<bool> samples;
|
||||||
|
};
|
||||||
|
} // nano::boostrap_ascending
|
||||||
|
|
@ -1,8 +1,8 @@
|
||||||
#include <nano/lib/config.hpp>
|
#include <nano/lib/config.hpp>
|
||||||
#include <nano/lib/json_error_response.hpp>
|
#include <nano/lib/json_error_response.hpp>
|
||||||
#include <nano/lib/timer.hpp>
|
#include <nano/lib/timer.hpp>
|
||||||
#include <nano/node/bootstrap/bootstrap_ascending.hpp>
|
|
||||||
#include <nano/node/bootstrap/bootstrap_lazy.hpp>
|
#include <nano/node/bootstrap/bootstrap_lazy.hpp>
|
||||||
|
#include <nano/node/bootstrap_ascending/service.hpp>
|
||||||
#include <nano/node/common.hpp>
|
#include <nano/node/common.hpp>
|
||||||
#include <nano/node/election.hpp>
|
#include <nano/node/election.hpp>
|
||||||
#include <nano/node/json_handler.hpp>
|
#include <nano/node/json_handler.hpp>
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
#include <nano/crypto_lib/random_pool_shuffle.hpp>
|
#include <nano/crypto_lib/random_pool_shuffle.hpp>
|
||||||
#include <nano/lib/threading.hpp>
|
#include <nano/lib/threading.hpp>
|
||||||
#include <nano/node/bootstrap/bootstrap_ascending.hpp>
|
#include <nano/node/bootstrap_ascending/service.hpp>
|
||||||
#include <nano/node/network.hpp>
|
#include <nano/node/network.hpp>
|
||||||
#include <nano/node/node.hpp>
|
#include <nano/node/node.hpp>
|
||||||
#include <nano/node/telemetry.hpp>
|
#include <nano/node/telemetry.hpp>
|
||||||
|
|
|
||||||
|
|
@ -11,9 +11,9 @@
|
||||||
#include <nano/node/block_publisher.hpp>
|
#include <nano/node/block_publisher.hpp>
|
||||||
#include <nano/node/blockprocessor.hpp>
|
#include <nano/node/blockprocessor.hpp>
|
||||||
#include <nano/node/bootstrap/bootstrap.hpp>
|
#include <nano/node/bootstrap/bootstrap.hpp>
|
||||||
#include <nano/node/bootstrap/bootstrap_ascending.hpp>
|
|
||||||
#include <nano/node/bootstrap/bootstrap_attempt.hpp>
|
#include <nano/node/bootstrap/bootstrap_attempt.hpp>
|
||||||
#include <nano/node/bootstrap/bootstrap_server.hpp>
|
#include <nano/node/bootstrap/bootstrap_server.hpp>
|
||||||
|
#include <nano/node/bootstrap_ascending/service.hpp>
|
||||||
#include <nano/node/confirmation_height_processor.hpp>
|
#include <nano/node/confirmation_height_processor.hpp>
|
||||||
#include <nano/node/distributed_work_factory.hpp>
|
#include <nano/node/distributed_work_factory.hpp>
|
||||||
#include <nano/node/election.hpp>
|
#include <nano/node/election.hpp>
|
||||||
|
|
@ -193,7 +193,7 @@ public:
|
||||||
nano::request_aggregator aggregator;
|
nano::request_aggregator aggregator;
|
||||||
nano::wallets wallets;
|
nano::wallets wallets;
|
||||||
nano::backlog_population backlog;
|
nano::backlog_population backlog;
|
||||||
nano::bootstrap_ascending ascendboot;
|
nano::bootstrap_ascending::service ascendboot;
|
||||||
nano::websocket_server websocket;
|
nano::websocket_server websocket;
|
||||||
nano::epoch_upgrader epoch_upgrader;
|
nano::epoch_upgrader epoch_upgrader;
|
||||||
nano::block_broadcast block_broadcast;
|
nano::block_broadcast block_broadcast;
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
#include <nano/lib/rpcconfig.hpp>
|
#include <nano/lib/rpcconfig.hpp>
|
||||||
#include <nano/node/bootstrap/bootstrap_ascending.hpp>
|
|
||||||
#include <nano/node/bootstrap/bootstrap_server.hpp>
|
#include <nano/node/bootstrap/bootstrap_server.hpp>
|
||||||
|
#include <nano/node/bootstrap_ascending/service.hpp>
|
||||||
#include <nano/node/ipc/ipc_server.hpp>
|
#include <nano/node/ipc/ipc_server.hpp>
|
||||||
#include <nano/node/json_handler.hpp>
|
#include <nano/node/json_handler.hpp>
|
||||||
#include <nano/node/transport/transport.hpp>
|
#include <nano/node/transport/transport.hpp>
|
||||||
|
|
@ -110,7 +110,7 @@ TEST (bootstrap_ascending, profile)
|
||||||
|
|
||||||
struct entry
|
struct entry
|
||||||
{
|
{
|
||||||
nano::bootstrap_ascending::async_tag tag;
|
nano::bootstrap_ascending::service::async_tag tag;
|
||||||
std::shared_ptr<nano::transport::channel> request_channel;
|
std::shared_ptr<nano::transport::channel> request_channel;
|
||||||
std::shared_ptr<nano::transport::channel> reply_channel;
|
std::shared_ptr<nano::transport::channel> reply_channel;
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue