Improve batching multiple blocks pending confirmation height processing (#2105)
* Improve batching multiple blocks pending confirmation height processing * Add batch time as config option * Apply review comments from cryptocode
This commit is contained in:
parent
2c53ab098f
commit
70784aa914
15 changed files with 323 additions and 53 deletions
|
@ -2168,6 +2168,43 @@ TEST (confirmation_height, observers)
|
|||
ASSERT_EQ (1, node1->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
|
||||
}
|
||||
|
||||
// This tests when a read has been done and the block no longer exists by the time a write is done
|
||||
TEST (confirmation_height, modified_chain)
|
||||
{
|
||||
nano::system system;
|
||||
nano::node_flags node_flags;
|
||||
node_flags.delay_frontier_confirmation_height_updating = true;
|
||||
auto node = system.add_node (nano::node_config (24000, system.logging), node_flags);
|
||||
|
||||
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
|
||||
nano::block_hash latest (node->latest (nano::test_genesis_key.pub));
|
||||
|
||||
nano::keypair key1;
|
||||
auto & store = node->store;
|
||||
auto send = std::make_shared<nano::send_block> (latest, key1.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest));
|
||||
|
||||
{
|
||||
auto transaction = node->store.tx_begin_write ();
|
||||
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *send).code);
|
||||
}
|
||||
|
||||
node->confirmation_height_processor.add (send->hash ());
|
||||
|
||||
{
|
||||
// The write guard prevents the confirmation height processor doing any writes
|
||||
auto write_guard = node->write_database_queue.wait (nano::writer::process_batch);
|
||||
while (!node->write_database_queue.contains (nano::writer::confirmation_height))
|
||||
;
|
||||
|
||||
store.block_del (store.tx_begin_write (), send->hash ());
|
||||
}
|
||||
|
||||
while (node->write_database_queue.contains (nano::writer::confirmation_height))
|
||||
;
|
||||
|
||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::invalid_block, nano::stat::dir::in));
|
||||
}
|
||||
|
||||
namespace nano
|
||||
{
|
||||
TEST (confirmation_height, pending_observer_callbacks)
|
||||
|
|
|
@ -721,6 +721,7 @@ TEST (node_config, v16_v17_upgrade)
|
|||
ASSERT_FALSE (tree.get_optional_child ("confirmation_history_size"));
|
||||
ASSERT_FALSE (tree.get_optional_child ("active_elections_size"));
|
||||
ASSERT_FALSE (tree.get_optional_child ("bandwidth_limit"));
|
||||
ASSERT_FALSE (tree.get_optional_child ("conf_height_processor_batch_min_time"));
|
||||
|
||||
config.deserialize_json (upgraded, tree);
|
||||
// The config options should be added after the upgrade
|
||||
|
@ -735,6 +736,7 @@ TEST (node_config, v16_v17_upgrade)
|
|||
ASSERT_TRUE (!!tree.get_optional_child ("confirmation_history_size"));
|
||||
ASSERT_TRUE (!!tree.get_optional_child ("active_elections_size"));
|
||||
ASSERT_TRUE (!!tree.get_optional_child ("bandwidth_limit"));
|
||||
ASSERT_TRUE (!!tree.get_optional_child ("conf_height_processor_batch_min_time"));
|
||||
|
||||
ASSERT_TRUE (upgraded);
|
||||
auto version (tree.get<std::string> ("version"));
|
||||
|
@ -773,6 +775,7 @@ TEST (node_config, v17_values)
|
|||
tree.put ("confirmation_history_size", 2048);
|
||||
tree.put ("active_elections_size", 8000);
|
||||
tree.put ("bandwidth_limit", 1572864);
|
||||
tree.put ("conf_height_processor_batch_min_time", 0);
|
||||
}
|
||||
|
||||
config.deserialize_json (upgraded, tree);
|
||||
|
@ -790,6 +793,7 @@ TEST (node_config, v17_values)
|
|||
ASSERT_EQ (config.confirmation_history_size, 2048);
|
||||
ASSERT_EQ (config.active_elections_size, 8000);
|
||||
ASSERT_EQ (config.bandwidth_limit, 1572864);
|
||||
ASSERT_EQ (config.conf_height_processor_batch_min_time.count (), 0);
|
||||
|
||||
// Check config is correct with other values
|
||||
tree.put ("tcp_io_timeout", std::numeric_limits<unsigned long>::max () - 100);
|
||||
|
@ -810,6 +814,7 @@ TEST (node_config, v17_values)
|
|||
tree.put ("confirmation_history_size", std::numeric_limits<unsigned long long>::max ());
|
||||
tree.put ("active_elections_size", std::numeric_limits<unsigned long long>::max ());
|
||||
tree.put ("bandwidth_limit", std::numeric_limits<size_t>::max ());
|
||||
tree.put ("conf_height_processor_batch_min_time", 500);
|
||||
|
||||
upgraded = false;
|
||||
config.deserialize_json (upgraded, tree);
|
||||
|
@ -829,6 +834,7 @@ TEST (node_config, v17_values)
|
|||
ASSERT_EQ (config.confirmation_history_size, std::numeric_limits<unsigned long long>::max ());
|
||||
ASSERT_EQ (config.active_elections_size, std::numeric_limits<unsigned long long>::max ());
|
||||
ASSERT_EQ (config.bandwidth_limit, std::numeric_limits<size_t>::max ());
|
||||
ASSERT_EQ (config.conf_height_processor_batch_min_time.count (), 500);
|
||||
}
|
||||
|
||||
// Regression test to ensure that deserializing includes changes node via get_required_child
|
||||
|
|
|
@ -92,6 +92,8 @@ add_library (node
|
|||
websocket.cpp
|
||||
websocketconfig.hpp
|
||||
websocketconfig.cpp
|
||||
write_database_queue.hpp
|
||||
write_database_queue.cpp
|
||||
xorshift.hpp)
|
||||
|
||||
target_link_libraries (node
|
||||
|
|
|
@ -448,8 +448,6 @@ bool nano::active_transactions::add (std::shared_ptr<nano::block> block_a, std::
|
|||
auto existing (roots.find (root));
|
||||
if (existing == roots.end () && confirmed_set.get<1> ().find (root) == confirmed_set.get<1> ().end ())
|
||||
{
|
||||
// Check if existing block is already confirmed
|
||||
assert (node.ledger.block_not_confirmed_or_not_exists (*block_a));
|
||||
auto hash (block_a->hash ());
|
||||
auto election (nano::make_shared<nano::election> (node, block_a, confirmation_action_a));
|
||||
uint64_t difficulty (0);
|
||||
|
|
|
@ -7,12 +7,13 @@
|
|||
|
||||
std::chrono::milliseconds constexpr nano::block_processor::confirmation_request_delay;
|
||||
|
||||
nano::block_processor::block_processor (nano::node & node_a) :
|
||||
nano::block_processor::block_processor (nano::node & node_a, nano::write_database_queue & write_database_queue_a) :
|
||||
generator (node_a),
|
||||
stopped (false),
|
||||
active (false),
|
||||
next_log (std::chrono::steady_clock::now ()),
|
||||
node (node_a)
|
||||
node (node_a),
|
||||
write_database_queue (write_database_queue_a)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -241,6 +242,7 @@ void nano::block_processor::process_batch (std::unique_lock<std::mutex> & lock_a
|
|||
}
|
||||
}
|
||||
lock_a.unlock ();
|
||||
auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch);
|
||||
auto transaction (node.store.tx_begin_write ());
|
||||
timer_l.restart ();
|
||||
lock_a.lock ();
|
||||
|
|
|
@ -18,6 +18,7 @@ namespace nano
|
|||
{
|
||||
class node;
|
||||
class transaction;
|
||||
class write_database_queue;
|
||||
|
||||
class rolled_hash
|
||||
{
|
||||
|
@ -32,7 +33,7 @@ public:
|
|||
class block_processor final
|
||||
{
|
||||
public:
|
||||
explicit block_processor (nano::node &);
|
||||
explicit block_processor (nano::node &, nano::write_database_queue &);
|
||||
~block_processor ();
|
||||
void stop ();
|
||||
void flush ();
|
||||
|
@ -70,6 +71,7 @@ private:
|
|||
static size_t const rolled_back_max = 1024;
|
||||
std::condition_variable condition;
|
||||
nano::node & node;
|
||||
nano::write_database_queue & write_database_queue;
|
||||
std::mutex mutex;
|
||||
|
||||
friend std::unique_ptr<seq_con_info_component> collect_seq_con_info (block_processor & block_processor, const std::string & name);
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
#include <nano/lib/utility.hpp>
|
||||
#include <nano/node/active_transactions.hpp>
|
||||
#include <nano/node/confirmation_height_processor.hpp>
|
||||
#include <nano/node/write_database_queue.hpp>
|
||||
#include <nano/secure/blockstore.hpp>
|
||||
#include <nano/secure/common.hpp>
|
||||
|
||||
|
@ -12,28 +13,15 @@
|
|||
#include <cassert>
|
||||
#include <numeric>
|
||||
|
||||
namespace
|
||||
{
|
||||
class confirmed_iterated_pair
|
||||
{
|
||||
public:
|
||||
confirmed_iterated_pair (uint64_t confirmed_height_a, uint64_t iterated_height_a) :
|
||||
confirmed_height (confirmed_height_a), iterated_height (iterated_height_a)
|
||||
{
|
||||
}
|
||||
|
||||
uint64_t confirmed_height;
|
||||
uint64_t iterated_height;
|
||||
};
|
||||
}
|
||||
|
||||
nano::confirmation_height_processor::confirmation_height_processor (nano::pending_confirmation_height & pending_confirmation_height_a, nano::block_store & store_a, nano::stat & stats_a, nano::active_transactions & active_a, nano::block_hash const & epoch_link_a, nano::logger_mt & logger_a) :
|
||||
nano::confirmation_height_processor::confirmation_height_processor (nano::pending_confirmation_height & pending_confirmation_height_a, nano::block_store & store_a, nano::stat & stats_a, nano::active_transactions & active_a, nano::block_hash const & epoch_link_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a) :
|
||||
pending_confirmations (pending_confirmation_height_a),
|
||||
store (store_a),
|
||||
stats (stats_a),
|
||||
active (active_a),
|
||||
epoch_link (epoch_link_a),
|
||||
logger (logger_a),
|
||||
write_database_queue (write_database_queue_a),
|
||||
batch_separate_pending_min_time (batch_separate_pending_min_time_a),
|
||||
thread ([this]() {
|
||||
nano::thread_role::set (nano::thread_role::name::confirmation_height_processing);
|
||||
this->run ();
|
||||
|
@ -68,13 +56,30 @@ void nano::confirmation_height_processor::run ()
|
|||
// Copy the hash so can be used outside owning the lock
|
||||
auto current_pending_block = pending_confirmations.current_hash;
|
||||
lk.unlock ();
|
||||
if (pending_writes.empty ())
|
||||
{
|
||||
// Separate blocks which are pending confirmation height can be batched by a minimum processing time (to improve disk write performance), so make sure the slate is clean when a new batch is starting.
|
||||
confirmed_iterated_pairs.clear ();
|
||||
timer.restart ();
|
||||
}
|
||||
add_confirmation_height (current_pending_block);
|
||||
lk.lock ();
|
||||
pending_confirmations.current_hash = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
condition.wait (lk);
|
||||
// If there are no blocks pending confirmation, then make sure we flush out the remaining writes
|
||||
if (!pending_writes.empty ())
|
||||
{
|
||||
lk.unlock ();
|
||||
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
|
||||
write_pending (pending_writes);
|
||||
lk.lock ();
|
||||
}
|
||||
else
|
||||
{
|
||||
condition.wait (lk);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -99,15 +104,8 @@ void nano::confirmation_height_processor::add_confirmation_height (nano::block_h
|
|||
boost::optional<conf_height_details> receive_details;
|
||||
auto current = hash_a;
|
||||
nano::account_info account_info;
|
||||
std::deque<conf_height_details> pending_writes;
|
||||
assert (receive_source_pairs_size == 0);
|
||||
|
||||
// Store the highest confirmation heights for accounts in pending_writes to reduce unnecessary iterating,
|
||||
// and iterated height to prevent iterating over the same blocks more than once from self-sends or "circular" sends between the same accounts.
|
||||
std::unordered_map<account, confirmed_iterated_pair> confirmed_iterated_pairs;
|
||||
|
||||
release_assert (receive_source_pairs.empty ());
|
||||
auto error = false;
|
||||
|
||||
auto read_transaction (store.tx_begin_read ());
|
||||
// Traverse account chain and all sources for receive blocks iteratively
|
||||
|
@ -226,23 +224,28 @@ void nano::confirmation_height_processor::add_confirmation_height (nano::block_h
|
|||
}
|
||||
}
|
||||
|
||||
// Check whether writing to the database should be done now
|
||||
auto total_pending_write_block_count = std::accumulate (pending_writes.cbegin (), pending_writes.cend (), uint64_t (0), [](uint64_t total, conf_height_details const & conf_height_details_a) {
|
||||
return total += conf_height_details_a.num_blocks_confirmed;
|
||||
});
|
||||
auto max_write_size_reached = (pending_writes.size () >= batch_write_size);
|
||||
// When there are a lot of pending confirmation height blocks, it is more efficient to
|
||||
// bulk some of them up to enable better write performance which becomes the bottleneck.
|
||||
auto min_time_exceeded = (timer.since_start () >= batch_separate_pending_min_time);
|
||||
auto finished_iterating = receive_source_pairs.empty ();
|
||||
auto no_pending = pending_confirmations.size () == 0;
|
||||
auto should_output = finished_iterating && (no_pending || min_time_exceeded);
|
||||
|
||||
if ((pending_writes.size () >= batch_write_size || receive_source_pairs.empty ()) && !pending_writes.empty ())
|
||||
if ((max_write_size_reached || should_output) && !pending_writes.empty ())
|
||||
{
|
||||
error = write_pending (pending_writes, total_pending_write_block_count);
|
||||
// Don't set any more blocks as confirmed from the original hash if an inconsistency is found
|
||||
if (error)
|
||||
if (write_database_queue.process (nano::writer::confirmation_height))
|
||||
{
|
||||
receive_source_pairs.clear ();
|
||||
receive_source_pairs_size = 0;
|
||||
break;
|
||||
auto scoped_write_guard = write_database_queue.pop ();
|
||||
auto error = write_pending (pending_writes);
|
||||
// Don't set any more blocks as confirmed from the original hash if an inconsistency is found
|
||||
if (error)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
assert (pending_writes.empty ());
|
||||
}
|
||||
|
||||
// Exit early when the processor has been stopped, otherwise this function may take a
|
||||
// while (and hence keep the process running) if updating a long chain.
|
||||
if (stopped)
|
||||
|
@ -257,10 +260,12 @@ void nano::confirmation_height_processor::add_confirmation_height (nano::block_h
|
|||
/*
|
||||
* Returns true if there was an error in finding one of the blocks to write a confirmation height for, false otherwise
|
||||
*/
|
||||
bool nano::confirmation_height_processor::write_pending (std::deque<conf_height_details> & all_pending_a, int64_t total_pending_write_block_count_a)
|
||||
bool nano::confirmation_height_processor::write_pending (std::deque<conf_height_details> & all_pending_a)
|
||||
{
|
||||
nano::account_info account_info;
|
||||
auto total_pending_write_block_count (total_pending_write_block_count_a);
|
||||
auto total_pending_write_block_count = std::accumulate (all_pending_a.cbegin (), all_pending_a.cend (), uint64_t (0), [](uint64_t total, conf_height_details const & conf_height_details_a) {
|
||||
return total += conf_height_details_a.num_blocks_confirmed;
|
||||
});
|
||||
|
||||
// Write in batches
|
||||
while (total_pending_write_block_count > 0)
|
||||
|
@ -278,8 +283,9 @@ bool nano::confirmation_height_processor::write_pending (std::deque<conf_height_
|
|||
// Do more thorough checking in Debug mode, indicates programming error.
|
||||
nano::block_sideband sideband;
|
||||
auto block = store.block_get (transaction, pending.hash, &sideband);
|
||||
assert (block != nullptr);
|
||||
assert (sideband.height == pending.height);
|
||||
static nano::network_constants network_constants;
|
||||
assert (network_constants.is_test_network () || block != nullptr);
|
||||
assert (network_constants.is_test_network () || sideband.height == pending.height);
|
||||
#else
|
||||
auto block = store.block_get (transaction, pending.hash);
|
||||
#endif
|
||||
|
@ -288,6 +294,9 @@ bool nano::confirmation_height_processor::write_pending (std::deque<conf_height_
|
|||
{
|
||||
logger.always_log ("Failed to write confirmation height for: ", pending.hash.to_string ());
|
||||
stats.inc (nano::stat::type::confirmation_height, nano::stat::detail::invalid_block);
|
||||
receive_source_pairs.clear ();
|
||||
receive_source_pairs_size = 0;
|
||||
all_pending_a.clear ();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -307,6 +316,7 @@ bool nano::confirmation_height_processor::write_pending (std::deque<conf_height_
|
|||
}
|
||||
}
|
||||
}
|
||||
assert (all_pending_a.empty ());
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -363,7 +373,8 @@ void nano::confirmation_height_processor::collect_unconfirmed_receive_and_source
|
|||
// Update the number of blocks confirmed by the last receive block
|
||||
if (!receive_source_pairs.empty ())
|
||||
{
|
||||
receive_source_pairs.back ().receive_details.num_blocks_confirmed = receive_source_pairs.back ().receive_details.height - confirmation_height_a;
|
||||
auto & last_receive_details = receive_source_pairs.back ().receive_details;
|
||||
last_receive_details.num_blocks_confirmed = last_receive_details.height - confirmation_height_a;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -383,6 +394,11 @@ source_hash (source_a)
|
|||
{
|
||||
}
|
||||
|
||||
confirmation_height_processor::confirmed_iterated_pair::confirmed_iterated_pair (uint64_t confirmed_height_a, uint64_t iterated_height_a) :
|
||||
confirmed_height (confirmed_height_a), iterated_height (iterated_height_a)
|
||||
{
|
||||
}
|
||||
|
||||
std::unique_ptr<seq_con_info_component> collect_seq_con_info (confirmation_height_processor & confirmation_height_processor_a, const std::string & name_a)
|
||||
{
|
||||
size_t receive_source_pairs_count = confirmation_height_processor_a.receive_source_pairs_size;
|
||||
|
|
|
@ -15,6 +15,7 @@ class stat;
|
|||
class active_transactions;
|
||||
class read_transaction;
|
||||
class logger_mt;
|
||||
class write_database_queue;
|
||||
|
||||
class pending_confirmation_height
|
||||
{
|
||||
|
@ -37,7 +38,7 @@ std::unique_ptr<seq_con_info_component> collect_seq_con_info (pending_confirmati
|
|||
class confirmation_height_processor final
|
||||
{
|
||||
public:
|
||||
confirmation_height_processor (pending_confirmation_height &, nano::block_store &, nano::stat &, nano::active_transactions &, nano::block_hash const &, nano::logger_mt &);
|
||||
confirmation_height_processor (pending_confirmation_height &, nano::block_store &, nano::stat &, nano::active_transactions &, nano::block_hash const &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &);
|
||||
~confirmation_height_processor ();
|
||||
void add (nano::block_hash const &);
|
||||
void stop ();
|
||||
|
@ -69,6 +70,14 @@ private:
|
|||
nano::block_hash source_hash;
|
||||
};
|
||||
|
||||
class confirmed_iterated_pair
|
||||
{
|
||||
public:
|
||||
confirmed_iterated_pair (uint64_t confirmed_height_a, uint64_t iterated_height_a);
|
||||
uint64_t confirmed_height;
|
||||
uint64_t iterated_height;
|
||||
};
|
||||
|
||||
std::condition_variable condition;
|
||||
nano::pending_confirmation_height & pending_confirmations;
|
||||
std::atomic<bool> stopped{ false };
|
||||
|
@ -79,12 +88,20 @@ private:
|
|||
nano::logger_mt & logger;
|
||||
std::atomic<uint64_t> receive_source_pairs_size{ 0 };
|
||||
std::vector<receive_source_pair> receive_source_pairs;
|
||||
|
||||
std::deque<conf_height_details> pending_writes;
|
||||
// Store the highest confirmation heights for accounts in pending_writes to reduce unnecessary iterating,
|
||||
// and iterated height to prevent iterating over the same blocks more than once from self-sends or "circular" sends between the same accounts.
|
||||
std::unordered_map<account, confirmed_iterated_pair> confirmed_iterated_pairs;
|
||||
nano::timer<std::chrono::milliseconds> timer;
|
||||
nano::write_database_queue & write_database_queue;
|
||||
std::chrono::milliseconds batch_separate_pending_min_time;
|
||||
std::thread thread;
|
||||
|
||||
void run ();
|
||||
void add_confirmation_height (nano::block_hash const &);
|
||||
void collect_unconfirmed_receive_and_sources_for_account (uint64_t, uint64_t, nano::block_hash const &, nano::account const &, nano::read_transaction const &);
|
||||
bool write_pending (std::deque<conf_height_details> &, int64_t);
|
||||
bool write_pending (std::deque<conf_height_details> &);
|
||||
|
||||
friend std::unique_ptr<seq_con_info_component> collect_seq_con_info (confirmation_height_processor &, const std::string &);
|
||||
friend class confirmation_height_pending_observer_callbacks_Test;
|
||||
|
|
|
@ -218,7 +218,7 @@ port_mapping (*this),
|
|||
vote_processor (*this),
|
||||
rep_crawler (*this),
|
||||
warmed_up (0),
|
||||
block_processor (*this),
|
||||
block_processor (*this, write_database_queue),
|
||||
block_processor_thread ([this]() {
|
||||
nano::thread_role::set (nano::thread_role::name::block_processing);
|
||||
this->block_processor.process_blocks ();
|
||||
|
@ -226,7 +226,7 @@ block_processor_thread ([this]() {
|
|||
online_reps (*this, config.online_weight_minimum.number ()),
|
||||
vote_uniquer (block_uniquer),
|
||||
active (*this),
|
||||
confirmation_height_processor (pending_confirmation_height, store, ledger.stats, active, ledger.epoch_link, logger),
|
||||
confirmation_height_processor (pending_confirmation_height, store, ledger.stats, active, ledger.epoch_link, write_database_queue, config.conf_height_processor_batch_min_time, logger),
|
||||
payment_observer_processor (observers.blocks),
|
||||
wallets (init_a.wallets_store_init, *this),
|
||||
startup_time (std::chrono::steady_clock::now ())
|
||||
|
@ -744,6 +744,7 @@ void nano::node::stop ()
|
|||
checker.stop ();
|
||||
wallets.stop ();
|
||||
stats.stop ();
|
||||
write_database_queue.stop ();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include <nano/node/vote_processor.hpp>
|
||||
#include <nano/node/wallet.hpp>
|
||||
#include <nano/node/websocket.hpp>
|
||||
#include <nano/node/write_database_queue.hpp>
|
||||
#include <nano/secure/ledger.hpp>
|
||||
|
||||
#include <boost/asio/thread_pool.hpp>
|
||||
|
@ -159,6 +160,7 @@ public:
|
|||
void ongoing_online_weight_calculation ();
|
||||
void ongoing_online_weight_calculation_queue ();
|
||||
bool online () const;
|
||||
nano::write_database_queue write_database_queue;
|
||||
boost::asio::io_context & io_ctx;
|
||||
boost::latch node_initialized_latch;
|
||||
nano::network_params network_params;
|
||||
|
|
|
@ -253,6 +253,7 @@ bool nano::node_config::upgrade_json (unsigned version_a, nano::jsonconfig & jso
|
|||
json.put ("confirmation_history_size", confirmation_history_size);
|
||||
json.put ("active_elections_size", active_elections_size);
|
||||
json.put ("bandwidth_limit", bandwidth_limit);
|
||||
json.put ("conf_height_processor_batch_min_time", conf_height_processor_batch_min_time.count ());
|
||||
}
|
||||
case 17:
|
||||
break;
|
||||
|
@ -404,7 +405,12 @@ nano::error nano::node_config::deserialize_json (bool & upgraded_a, nano::jsonco
|
|||
json.get<size_t> ("confirmation_history_size", confirmation_history_size);
|
||||
json.get<size_t> ("active_elections_size", active_elections_size);
|
||||
json.get<size_t> ("bandwidth_limit", bandwidth_limit);
|
||||
nano::network_params network;
|
||||
|
||||
auto conf_height_processor_batch_min_time_l (conf_height_processor_batch_min_time.count ());
|
||||
json.get ("conf_height_processor_batch_min_time", conf_height_processor_batch_min_time_l);
|
||||
conf_height_processor_batch_min_time = std::chrono::milliseconds (conf_height_processor_batch_min_time_l);
|
||||
|
||||
nano::network_constants network;
|
||||
// Validate ranges
|
||||
if (online_weight_quorum > 100)
|
||||
{
|
||||
|
@ -418,7 +424,7 @@ nano::error nano::node_config::deserialize_json (bool & upgraded_a, nano::jsonco
|
|||
{
|
||||
json.get_error ().set ("io_threads must be non-zero");
|
||||
}
|
||||
if (active_elections_size <= 250 && !network.network.is_test_network ())
|
||||
if (active_elections_size <= 250 && !network.is_test_network ())
|
||||
{
|
||||
json.get_error ().set ("active_elections_size must be grater than 250");
|
||||
}
|
||||
|
|
|
@ -75,6 +75,7 @@ public:
|
|||
static std::chrono::seconds constexpr keepalive_cutoff = keepalive_period * 5;
|
||||
static std::chrono::minutes constexpr wallet_backup_interval = std::chrono::minutes (5);
|
||||
size_t bandwidth_limit{ 1536 * 1024 };
|
||||
std::chrono::milliseconds conf_height_processor_batch_min_time{ 50 };
|
||||
static int json_version ()
|
||||
{
|
||||
return 17;
|
||||
|
|
83
nano/node/write_database_queue.cpp
Normal file
83
nano/node/write_database_queue.cpp
Normal file
|
@ -0,0 +1,83 @@
|
|||
#include <nano/node/write_database_queue.hpp>
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
nano::write_guard::write_guard (std::condition_variable & cv_a, std::function<void()> guard_finish_callback_a) :
|
||||
cv (cv_a),
|
||||
guard_finish_callback (guard_finish_callback_a)
|
||||
{
|
||||
}
|
||||
|
||||
nano::write_guard::~write_guard ()
|
||||
{
|
||||
guard_finish_callback ();
|
||||
cv.notify_all ();
|
||||
}
|
||||
|
||||
nano::write_database_queue::write_database_queue () :
|
||||
// clang-format off
|
||||
guard_finish_callback ([&queue = queue, &mutex = mutex]() {
|
||||
std::lock_guard<std::mutex> guard (mutex);
|
||||
queue.pop_front ();
|
||||
})
|
||||
// clang-format on
|
||||
{
|
||||
}
|
||||
|
||||
nano::write_guard nano::write_database_queue::wait (nano::writer writer)
|
||||
{
|
||||
std::unique_lock<std::mutex> lk (mutex);
|
||||
// Add writer to the end of the queue if it's not already waiting
|
||||
auto exists = std::find (queue.cbegin (), queue.cend (), writer) != queue.cend ();
|
||||
if (!exists)
|
||||
{
|
||||
queue.push_back (writer);
|
||||
}
|
||||
|
||||
while (!stopped && queue.front () != writer)
|
||||
{
|
||||
cv.wait (lk);
|
||||
}
|
||||
|
||||
return write_guard (cv, guard_finish_callback);
|
||||
}
|
||||
|
||||
bool nano::write_database_queue::contains (nano::writer writer)
|
||||
{
|
||||
std::lock_guard<std::mutex> guard (mutex);
|
||||
return std::find (queue.cbegin (), queue.cend (), writer) != queue.cend ();
|
||||
}
|
||||
|
||||
bool nano::write_database_queue::process (nano::writer writer)
|
||||
{
|
||||
auto result = false;
|
||||
{
|
||||
std::lock_guard<std::mutex> guard (mutex);
|
||||
// Add writer to the end of the queue if it's not already waiting
|
||||
auto exists = std::find (queue.cbegin (), queue.cend (), writer) != queue.cend ();
|
||||
if (!exists)
|
||||
{
|
||||
queue.push_back (writer);
|
||||
}
|
||||
|
||||
result = (queue.front () == writer);
|
||||
}
|
||||
|
||||
if (!result)
|
||||
{
|
||||
cv.notify_all ();
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
nano::write_guard nano::write_database_queue::pop ()
|
||||
{
|
||||
return write_guard (cv, guard_finish_callback);
|
||||
}
|
||||
|
||||
void nano::write_database_queue::stop ()
|
||||
{
|
||||
stopped = true;
|
||||
cv.notify_all ();
|
||||
}
|
55
nano/node/write_database_queue.hpp
Normal file
55
nano/node/write_database_queue.hpp
Normal file
|
@ -0,0 +1,55 @@
|
|||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
#include <deque>
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
|
||||
namespace nano
|
||||
{
|
||||
/** Distinct areas write locking is done, order is irrelevant */
|
||||
enum class writer
|
||||
{
|
||||
confirmation_height,
|
||||
process_batch
|
||||
};
|
||||
|
||||
class write_guard final
|
||||
{
|
||||
public:
|
||||
write_guard (std::condition_variable & cv_a, std::function<void()> guard_finish_callback_a);
|
||||
~write_guard ();
|
||||
|
||||
private:
|
||||
std::condition_variable & cv;
|
||||
std::function<void()> guard_finish_callback;
|
||||
};
|
||||
|
||||
class write_database_queue final
|
||||
{
|
||||
public:
|
||||
write_database_queue ();
|
||||
/** Blocks until we are at the head of the queue */
|
||||
write_guard wait (nano::writer writer);
|
||||
|
||||
/** Returns true if this writer is now at the front of the queue */
|
||||
bool process (nano::writer writer);
|
||||
|
||||
/** Returns true if this writer is anywhere in the queue */
|
||||
bool contains (nano::writer writer);
|
||||
|
||||
/** Doesn't actually pop anything until the returned write_guard is out of scope */
|
||||
write_guard pop ();
|
||||
|
||||
/** This will release anything which is being blocked by the wait function */
|
||||
void stop ();
|
||||
|
||||
private:
|
||||
std::deque<nano::writer> queue;
|
||||
std::mutex mutex;
|
||||
std::condition_variable cv;
|
||||
std::function<void()> guard_finish_callback;
|
||||
std::atomic<bool> stopped{ false };
|
||||
};
|
||||
}
|
|
@ -461,7 +461,7 @@ TEST (node, mass_vote_by_hash)
|
|||
}
|
||||
}
|
||||
|
||||
TEST (confirmation_height, many_accounts)
|
||||
TEST (confirmation_height, many_accounts_single_confirmation)
|
||||
{
|
||||
nano::system system;
|
||||
nano::node_config node_config (24000, system.logging);
|
||||
|
@ -531,6 +531,48 @@ TEST (confirmation_height, many_accounts)
|
|||
ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in), num_accounts * 2 - 2);
|
||||
}
|
||||
|
||||
TEST (confirmation_height, many_accounts_many_confirmations)
|
||||
{
|
||||
nano::system system;
|
||||
nano::node_config node_config (24000, system.logging);
|
||||
node_config.online_weight_minimum = 100;
|
||||
nano::node_flags node_flags;
|
||||
node_flags.delay_frontier_confirmation_height_updating = true;
|
||||
auto node = system.add_node (node_config, node_flags);
|
||||
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
|
||||
|
||||
auto num_accounts = 10000;
|
||||
auto latest_genesis = node->latest (nano::test_genesis_key.pub);
|
||||
std::vector<std::shared_ptr<nano::open_block>> open_blocks;
|
||||
{
|
||||
auto transaction = node->store.tx_begin_write ();
|
||||
for (auto i = num_accounts - 1; i > 0; --i)
|
||||
{
|
||||
nano::keypair key;
|
||||
system.wallet (0)->insert_adhoc (key.prv);
|
||||
|
||||
nano::send_block send (latest_genesis, key.pub, nano::Gxrb_ratio, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest_genesis));
|
||||
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send).code);
|
||||
auto open = std::make_shared<nano::open_block> (send.hash (), nano::test_genesis_key.pub, key.pub, key.prv, key.pub, system.work.generate (key.pub));
|
||||
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *open).code);
|
||||
open_blocks.push_back (std::move (open));
|
||||
latest_genesis = send.hash ();
|
||||
}
|
||||
}
|
||||
|
||||
// Confirm all of the accounts
|
||||
for (auto & open_block : open_blocks)
|
||||
{
|
||||
node->block_confirm (open_block);
|
||||
}
|
||||
|
||||
system.deadline_set (60s);
|
||||
while (node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in) != (num_accounts - 1) * 2)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
}
|
||||
|
||||
TEST (confirmation_height, long_chains)
|
||||
{
|
||||
nano::system system;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue