Merge pull request #4787 from pwojcikdev/bounded-backlog-pr

Bounded backlog
This commit is contained in:
Piotr Wójcik 2024-11-29 16:32:35 +01:00 committed by GitHub
commit d63dd09692
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
28 changed files with 1125 additions and 250 deletions

View file

@ -21,6 +21,7 @@
#include <nano/secure/ledger_set_any.hpp>
#include <nano/secure/ledger_set_confirmed.hpp>
#include <nano/secure/vote.hpp>
#include <nano/test_common/chains.hpp>
#include <nano/test_common/network.hpp>
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>
@ -3698,3 +3699,22 @@ TEST (node, container_info)
ASSERT_NO_THROW (node1.container_info ());
ASSERT_NO_THROW (node2.container_info ());
}
TEST (node, bounded_backlog)
{
nano::test::system system;
nano::node_config node_config;
node_config.max_backlog = 10;
node_config.backlog_scan.enable = false;
auto & node = *system.add_node (node_config);
const int howmany_blocks = 64;
const int howmany_chains = 16;
auto chains = nano::test::setup_chains (system, node, howmany_chains, howmany_blocks, nano::dev::genesis_key, /* do not confirm */ false);
node.backlog_scan.trigger ();
ASSERT_TIMELY_EQ (20s, node.ledger.block_count (), 11); // 10 + genesis
}

View file

@ -67,233 +67,6 @@ TEST (toml, diff_equal)
ASSERT_TRUE (other.empty ());
}
TEST (toml, daemon_config_update_array)
{
nano::tomlconfig t;
std::filesystem::path data_path (".");
nano::daemon_config c{ data_path, nano::dev::network_params };
c.node.preconfigured_peers.push_back ("dev-peer.org");
c.serialize_toml (t);
c.deserialize_toml (t);
ASSERT_EQ (c.node.preconfigured_peers[0], "dev-peer.org");
}
/** Empty rpc config file should match a default config object */
TEST (toml, rpc_config_deserialize_defaults)
{
std::stringstream ss;
// A config file with values that differs from devnet defaults
ss << R"toml(
[process]
)toml";
nano::tomlconfig t;
t.read (ss);
nano::rpc_config conf{ nano::dev::network_params.network };
nano::rpc_config defaults{ nano::dev::network_params.network };
conf.deserialize_toml (t);
ASSERT_FALSE (t.get_error ()) << t.get_error ().get_message ();
ASSERT_EQ (conf.address, defaults.address);
ASSERT_EQ (conf.enable_control, defaults.enable_control);
ASSERT_EQ (conf.max_json_depth, defaults.max_json_depth);
ASSERT_EQ (conf.max_request_size, defaults.max_request_size);
ASSERT_EQ (conf.port, defaults.port);
ASSERT_EQ (conf.rpc_process.io_threads, defaults.rpc_process.io_threads);
ASSERT_EQ (conf.rpc_process.ipc_address, defaults.rpc_process.ipc_address);
ASSERT_EQ (conf.rpc_process.ipc_port, defaults.rpc_process.ipc_port);
ASSERT_EQ (conf.rpc_process.num_ipc_connections, defaults.rpc_process.num_ipc_connections);
ASSERT_EQ (conf.rpc_logging.log_rpc, defaults.rpc_logging.log_rpc);
}
/** Empty config file should match a default config object */
TEST (toml, daemon_config_deserialize_defaults)
{
std::stringstream ss;
ss << R"toml(
[node]
[node.backlog_scan]
[node.bootstrap]
[node.bootstrap_server]
[node.block_processor]
[node.diagnostics.txn_tracking]
[node.httpcallback]
[node.ipc.local]
[node.ipc.tcp]
[node.logging]
[node.statistics.log]
[node.statistics.sampling]
[node.vote_processor]
[node.websocket]
[node.lmdb]
[node.rocksdb]
[opencl]
[rpc]
[rpc.child_process]
)toml";
nano::tomlconfig t;
t.read (ss);
nano::daemon_config conf;
nano::daemon_config defaults;
conf.deserialize_toml (t);
ASSERT_FALSE (t.get_error ()) << t.get_error ().get_message ();
ASSERT_EQ (conf.opencl_enable, defaults.opencl_enable);
ASSERT_EQ (conf.opencl.device, defaults.opencl.device);
ASSERT_EQ (conf.opencl.platform, defaults.opencl.platform);
ASSERT_EQ (conf.opencl.threads, defaults.opencl.threads);
ASSERT_EQ (conf.rpc_enable, defaults.rpc_enable);
ASSERT_EQ (conf.rpc.enable_sign_hash, defaults.rpc.enable_sign_hash);
ASSERT_EQ (conf.rpc.child_process.enable, defaults.rpc.child_process.enable);
ASSERT_EQ (conf.rpc.child_process.rpc_path, defaults.rpc.child_process.rpc_path);
ASSERT_EQ (conf.node.active_elections.size, defaults.node.active_elections.size);
ASSERT_EQ (conf.node.allow_local_peers, defaults.node.allow_local_peers);
ASSERT_EQ (conf.node.backup_before_upgrade, defaults.node.backup_before_upgrade);
ASSERT_EQ (conf.node.bandwidth_limit, defaults.node.bandwidth_limit);
ASSERT_EQ (conf.node.bandwidth_limit_burst_ratio, defaults.node.bandwidth_limit_burst_ratio);
ASSERT_EQ (conf.node.bootstrap_bandwidth_limit, defaults.node.bootstrap_bandwidth_limit);
ASSERT_EQ (conf.node.bootstrap_bandwidth_burst_ratio, defaults.node.bootstrap_bandwidth_burst_ratio);
ASSERT_EQ (conf.node.block_processor_batch_max_time, defaults.node.block_processor_batch_max_time);
ASSERT_EQ (conf.node.bootstrap_connections, defaults.node.bootstrap_connections);
ASSERT_EQ (conf.node.bootstrap_connections_max, defaults.node.bootstrap_connections_max);
ASSERT_EQ (conf.node.bootstrap_initiator_threads, defaults.node.bootstrap_initiator_threads);
ASSERT_EQ (conf.node.bootstrap_serving_threads, defaults.node.bootstrap_serving_threads);
ASSERT_EQ (conf.node.bootstrap_frontier_request_count, defaults.node.bootstrap_frontier_request_count);
ASSERT_EQ (conf.node.bootstrap_fraction_numerator, defaults.node.bootstrap_fraction_numerator);
ASSERT_EQ (conf.node.confirming_set_batch_time, defaults.node.confirming_set_batch_time);
ASSERT_EQ (conf.node.enable_voting, defaults.node.enable_voting);
ASSERT_EQ (conf.node.external_address, defaults.node.external_address);
ASSERT_EQ (conf.node.external_port, defaults.node.external_port);
ASSERT_EQ (conf.node.io_threads, defaults.node.io_threads);
ASSERT_EQ (conf.node.max_work_generate_multiplier, defaults.node.max_work_generate_multiplier);
ASSERT_EQ (conf.node.network_threads, defaults.node.network_threads);
ASSERT_EQ (conf.node.background_threads, defaults.node.background_threads);
ASSERT_EQ (conf.node.secondary_work_peers, defaults.node.secondary_work_peers);
ASSERT_EQ (conf.node.online_weight_minimum, defaults.node.online_weight_minimum);
ASSERT_EQ (conf.node.representative_vote_weight_minimum, defaults.node.representative_vote_weight_minimum);
ASSERT_EQ (conf.node.rep_crawler_weight_minimum, defaults.node.rep_crawler_weight_minimum);
ASSERT_EQ (conf.node.password_fanout, defaults.node.password_fanout);
ASSERT_EQ (conf.node.peering_port, defaults.node.peering_port);
ASSERT_EQ (conf.node.pow_sleep_interval, defaults.node.pow_sleep_interval);
ASSERT_EQ (conf.node.preconfigured_peers, defaults.node.preconfigured_peers);
ASSERT_EQ (conf.node.preconfigured_representatives, defaults.node.preconfigured_representatives);
ASSERT_EQ (conf.node.receive_minimum, defaults.node.receive_minimum);
ASSERT_EQ (conf.node.signature_checker_threads, defaults.node.signature_checker_threads);
ASSERT_EQ (conf.node.tcp_incoming_connections_max, defaults.node.tcp_incoming_connections_max);
ASSERT_EQ (conf.node.tcp_io_timeout, defaults.node.tcp_io_timeout);
ASSERT_EQ (conf.node.unchecked_cutoff_time, defaults.node.unchecked_cutoff_time);
ASSERT_EQ (conf.node.use_memory_pools, defaults.node.use_memory_pools);
ASSERT_EQ (conf.node.vote_generator_delay, defaults.node.vote_generator_delay);
ASSERT_EQ (conf.node.vote_minimum, defaults.node.vote_minimum);
ASSERT_EQ (conf.node.work_peers, defaults.node.work_peers);
ASSERT_EQ (conf.node.work_threads, defaults.node.work_threads);
ASSERT_EQ (conf.node.max_queued_requests, defaults.node.max_queued_requests);
ASSERT_EQ (conf.node.request_aggregator_threads, defaults.node.request_aggregator_threads);
ASSERT_EQ (conf.node.max_unchecked_blocks, defaults.node.max_unchecked_blocks);
ASSERT_EQ (conf.node.backlog_scan.enable, defaults.node.backlog_scan.enable);
ASSERT_EQ (conf.node.backlog_scan.batch_size, defaults.node.backlog_scan.batch_size);
ASSERT_EQ (conf.node.backlog_scan.rate_limit, defaults.node.backlog_scan.rate_limit);
ASSERT_EQ (conf.node.enable_upnp, defaults.node.enable_upnp);
ASSERT_EQ (conf.node.websocket_config.enabled, defaults.node.websocket_config.enabled);
ASSERT_EQ (conf.node.websocket_config.address, defaults.node.websocket_config.address);
ASSERT_EQ (conf.node.websocket_config.port, defaults.node.websocket_config.port);
ASSERT_EQ (conf.node.callback_address, defaults.node.callback_address);
ASSERT_EQ (conf.node.callback_port, defaults.node.callback_port);
ASSERT_EQ (conf.node.callback_target, defaults.node.callback_target);
ASSERT_EQ (conf.node.ipc_config.transport_domain.allow_unsafe, defaults.node.ipc_config.transport_domain.allow_unsafe);
ASSERT_EQ (conf.node.ipc_config.transport_domain.enabled, defaults.node.ipc_config.transport_domain.enabled);
ASSERT_EQ (conf.node.ipc_config.transport_domain.io_timeout, defaults.node.ipc_config.transport_domain.io_timeout);
ASSERT_EQ (conf.node.ipc_config.transport_domain.io_threads, defaults.node.ipc_config.transport_domain.io_threads);
ASSERT_EQ (conf.node.ipc_config.transport_domain.path, defaults.node.ipc_config.transport_domain.path);
ASSERT_EQ (conf.node.ipc_config.transport_tcp.enabled, defaults.node.ipc_config.transport_tcp.enabled);
ASSERT_EQ (conf.node.ipc_config.transport_tcp.io_timeout, defaults.node.ipc_config.transport_tcp.io_timeout);
ASSERT_EQ (conf.node.ipc_config.transport_tcp.io_threads, defaults.node.ipc_config.transport_tcp.io_threads);
ASSERT_EQ (conf.node.ipc_config.transport_tcp.port, defaults.node.ipc_config.transport_tcp.port);
ASSERT_EQ (conf.node.ipc_config.flatbuffers.skip_unexpected_fields_in_json, defaults.node.ipc_config.flatbuffers.skip_unexpected_fields_in_json);
ASSERT_EQ (conf.node.ipc_config.flatbuffers.verify_buffers, defaults.node.ipc_config.flatbuffers.verify_buffers);
ASSERT_EQ (conf.node.diagnostics_config.txn_tracking.enable, defaults.node.diagnostics_config.txn_tracking.enable);
ASSERT_EQ (conf.node.diagnostics_config.txn_tracking.ignore_writes_below_block_processor_max_time, defaults.node.diagnostics_config.txn_tracking.ignore_writes_below_block_processor_max_time);
ASSERT_EQ (conf.node.diagnostics_config.txn_tracking.min_read_txn_time, defaults.node.diagnostics_config.txn_tracking.min_read_txn_time);
ASSERT_EQ (conf.node.diagnostics_config.txn_tracking.min_write_txn_time, defaults.node.diagnostics_config.txn_tracking.min_write_txn_time);
ASSERT_EQ (conf.node.stats_config.max_samples, defaults.node.stats_config.max_samples);
ASSERT_EQ (conf.node.stats_config.log_rotation_count, defaults.node.stats_config.log_rotation_count);
ASSERT_EQ (conf.node.stats_config.log_samples_interval, defaults.node.stats_config.log_samples_interval);
ASSERT_EQ (conf.node.stats_config.log_counters_interval, defaults.node.stats_config.log_counters_interval);
ASSERT_EQ (conf.node.stats_config.log_headers, defaults.node.stats_config.log_headers);
ASSERT_EQ (conf.node.stats_config.log_counters_filename, defaults.node.stats_config.log_counters_filename);
ASSERT_EQ (conf.node.stats_config.log_samples_filename, defaults.node.stats_config.log_samples_filename);
ASSERT_EQ (conf.node.lmdb_config.sync, defaults.node.lmdb_config.sync);
ASSERT_EQ (conf.node.lmdb_config.max_databases, defaults.node.lmdb_config.max_databases);
ASSERT_EQ (conf.node.lmdb_config.map_size, defaults.node.lmdb_config.map_size);
ASSERT_EQ (conf.node.rocksdb_config.enable, defaults.node.rocksdb_config.enable);
ASSERT_EQ (conf.node.rocksdb_config.io_threads, defaults.node.rocksdb_config.io_threads);
ASSERT_EQ (conf.node.rocksdb_config.read_cache, defaults.node.rocksdb_config.read_cache);
ASSERT_EQ (conf.node.rocksdb_config.write_cache, defaults.node.rocksdb_config.write_cache);
ASSERT_EQ (conf.node.optimistic_scheduler.enable, defaults.node.optimistic_scheduler.enable);
ASSERT_EQ (conf.node.optimistic_scheduler.gap_threshold, defaults.node.optimistic_scheduler.gap_threshold);
ASSERT_EQ (conf.node.optimistic_scheduler.max_size, defaults.node.optimistic_scheduler.max_size);
ASSERT_EQ (conf.node.hinted_scheduler.enable, defaults.node.hinted_scheduler.enable);
ASSERT_EQ (conf.node.hinted_scheduler.hinting_threshold_percent, defaults.node.hinted_scheduler.hinting_threshold_percent);
ASSERT_EQ (conf.node.hinted_scheduler.check_interval.count (), defaults.node.hinted_scheduler.check_interval.count ());
ASSERT_EQ (conf.node.hinted_scheduler.block_cooldown.count (), defaults.node.hinted_scheduler.block_cooldown.count ());
ASSERT_EQ (conf.node.hinted_scheduler.vacancy_threshold_percent, defaults.node.hinted_scheduler.vacancy_threshold_percent);
ASSERT_EQ (conf.node.vote_cache.max_size, defaults.node.vote_cache.max_size);
ASSERT_EQ (conf.node.vote_cache.max_voters, defaults.node.vote_cache.max_voters);
ASSERT_EQ (conf.node.block_processor.max_peer_queue, defaults.node.block_processor.max_peer_queue);
ASSERT_EQ (conf.node.block_processor.max_system_queue, defaults.node.block_processor.max_system_queue);
ASSERT_EQ (conf.node.block_processor.priority_live, defaults.node.block_processor.priority_live);
ASSERT_EQ (conf.node.block_processor.priority_bootstrap, defaults.node.block_processor.priority_bootstrap);
ASSERT_EQ (conf.node.block_processor.priority_local, defaults.node.block_processor.priority_local);
ASSERT_EQ (conf.node.vote_processor.max_pr_queue, defaults.node.vote_processor.max_pr_queue);
ASSERT_EQ (conf.node.vote_processor.max_non_pr_queue, defaults.node.vote_processor.max_non_pr_queue);
ASSERT_EQ (conf.node.vote_processor.pr_priority, defaults.node.vote_processor.pr_priority);
ASSERT_EQ (conf.node.vote_processor.threads, defaults.node.vote_processor.threads);
ASSERT_EQ (conf.node.vote_processor.batch_size, defaults.node.vote_processor.batch_size);
ASSERT_EQ (conf.node.bootstrap.enable, defaults.node.bootstrap.enable);
ASSERT_EQ (conf.node.bootstrap.enable_database_scan, defaults.node.bootstrap.enable_database_scan);
ASSERT_EQ (conf.node.bootstrap.enable_dependency_walker, defaults.node.bootstrap.enable_dependency_walker);
ASSERT_EQ (conf.node.bootstrap.channel_limit, defaults.node.bootstrap.channel_limit);
ASSERT_EQ (conf.node.bootstrap.database_rate_limit, defaults.node.bootstrap.database_rate_limit);
ASSERT_EQ (conf.node.bootstrap.database_warmup_ratio, defaults.node.bootstrap.database_warmup_ratio);
ASSERT_EQ (conf.node.bootstrap.max_pull_count, defaults.node.bootstrap.max_pull_count);
ASSERT_EQ (conf.node.bootstrap.request_timeout, defaults.node.bootstrap.request_timeout);
ASSERT_EQ (conf.node.bootstrap.throttle_coefficient, defaults.node.bootstrap.throttle_coefficient);
ASSERT_EQ (conf.node.bootstrap.throttle_wait, defaults.node.bootstrap.throttle_wait);
ASSERT_EQ (conf.node.bootstrap.block_processor_threshold, defaults.node.bootstrap.block_processor_threshold);
ASSERT_EQ (conf.node.bootstrap.max_requests, defaults.node.bootstrap.max_requests);
ASSERT_EQ (conf.node.bootstrap_server.max_queue, defaults.node.bootstrap_server.max_queue);
ASSERT_EQ (conf.node.bootstrap_server.threads, defaults.node.bootstrap_server.threads);
ASSERT_EQ (conf.node.bootstrap_server.batch_size, defaults.node.bootstrap_server.batch_size);
ASSERT_EQ (conf.node.request_aggregator.max_queue, defaults.node.request_aggregator.max_queue);
ASSERT_EQ (conf.node.request_aggregator.threads, defaults.node.request_aggregator.threads);
ASSERT_EQ (conf.node.request_aggregator.batch_size, defaults.node.request_aggregator.batch_size);
ASSERT_EQ (conf.node.message_processor.threads, defaults.node.message_processor.threads);
ASSERT_EQ (conf.node.message_processor.max_queue, defaults.node.message_processor.max_queue);
}
TEST (toml, optional_child)
{
std::stringstream ss;
@ -412,8 +185,243 @@ TEST (toml, array)
});
}
TEST (toml_config, daemon_config_update_array)
{
nano::tomlconfig t;
std::filesystem::path data_path (".");
nano::daemon_config c{ data_path, nano::dev::network_params };
c.node.preconfigured_peers.push_back ("dev-peer.org");
c.serialize_toml (t);
c.deserialize_toml (t);
ASSERT_EQ (c.node.preconfigured_peers[0], "dev-peer.org");
}
/** Empty rpc config file should match a default config object */
TEST (toml_config, rpc_config_deserialize_defaults)
{
std::stringstream ss;
// A config file with values that differs from devnet defaults
ss << R"toml(
[process]
)toml";
nano::tomlconfig t;
t.read (ss);
nano::rpc_config conf{ nano::dev::network_params.network };
nano::rpc_config defaults{ nano::dev::network_params.network };
conf.deserialize_toml (t);
ASSERT_FALSE (t.get_error ()) << t.get_error ().get_message ();
ASSERT_EQ (conf.address, defaults.address);
ASSERT_EQ (conf.enable_control, defaults.enable_control);
ASSERT_EQ (conf.max_json_depth, defaults.max_json_depth);
ASSERT_EQ (conf.max_request_size, defaults.max_request_size);
ASSERT_EQ (conf.port, defaults.port);
ASSERT_EQ (conf.rpc_process.io_threads, defaults.rpc_process.io_threads);
ASSERT_EQ (conf.rpc_process.ipc_address, defaults.rpc_process.ipc_address);
ASSERT_EQ (conf.rpc_process.ipc_port, defaults.rpc_process.ipc_port);
ASSERT_EQ (conf.rpc_process.num_ipc_connections, defaults.rpc_process.num_ipc_connections);
ASSERT_EQ (conf.rpc_logging.log_rpc, defaults.rpc_logging.log_rpc);
}
/** Empty config file should match a default config object */
TEST (toml_config, daemon_config_deserialize_defaults)
{
std::stringstream ss;
ss << R"toml(
[node]
[node.backlog_scan]
[node.bounded_backlog]
[node.bootstrap]
[node.bootstrap_server]
[node.block_processor]
[node.diagnostics.txn_tracking]
[node.httpcallback]
[node.ipc.local]
[node.ipc.tcp]
[node.logging]
[node.statistics.log]
[node.statistics.sampling]
[node.vote_processor]
[node.websocket]
[node.lmdb]
[node.rocksdb]
[opencl]
[rpc]
[rpc.child_process]
)toml";
nano::tomlconfig t;
t.read (ss);
nano::daemon_config conf;
nano::daemon_config defaults;
conf.deserialize_toml (t);
ASSERT_FALSE (t.get_error ()) << t.get_error ().get_message ();
ASSERT_EQ (conf.opencl_enable, defaults.opencl_enable);
ASSERT_EQ (conf.opencl.device, defaults.opencl.device);
ASSERT_EQ (conf.opencl.platform, defaults.opencl.platform);
ASSERT_EQ (conf.opencl.threads, defaults.opencl.threads);
ASSERT_EQ (conf.rpc_enable, defaults.rpc_enable);
ASSERT_EQ (conf.rpc.enable_sign_hash, defaults.rpc.enable_sign_hash);
ASSERT_EQ (conf.rpc.child_process.enable, defaults.rpc.child_process.enable);
ASSERT_EQ (conf.rpc.child_process.rpc_path, defaults.rpc.child_process.rpc_path);
ASSERT_EQ (conf.node.active_elections.size, defaults.node.active_elections.size);
ASSERT_EQ (conf.node.allow_local_peers, defaults.node.allow_local_peers);
ASSERT_EQ (conf.node.backup_before_upgrade, defaults.node.backup_before_upgrade);
ASSERT_EQ (conf.node.bandwidth_limit, defaults.node.bandwidth_limit);
ASSERT_EQ (conf.node.bandwidth_limit_burst_ratio, defaults.node.bandwidth_limit_burst_ratio);
ASSERT_EQ (conf.node.bootstrap_bandwidth_limit, defaults.node.bootstrap_bandwidth_limit);
ASSERT_EQ (conf.node.bootstrap_bandwidth_burst_ratio, defaults.node.bootstrap_bandwidth_burst_ratio);
ASSERT_EQ (conf.node.block_processor_batch_max_time, defaults.node.block_processor_batch_max_time);
ASSERT_EQ (conf.node.bootstrap_connections, defaults.node.bootstrap_connections);
ASSERT_EQ (conf.node.bootstrap_connections_max, defaults.node.bootstrap_connections_max);
ASSERT_EQ (conf.node.bootstrap_initiator_threads, defaults.node.bootstrap_initiator_threads);
ASSERT_EQ (conf.node.bootstrap_serving_threads, defaults.node.bootstrap_serving_threads);
ASSERT_EQ (conf.node.bootstrap_frontier_request_count, defaults.node.bootstrap_frontier_request_count);
ASSERT_EQ (conf.node.bootstrap_fraction_numerator, defaults.node.bootstrap_fraction_numerator);
ASSERT_EQ (conf.node.confirming_set_batch_time, defaults.node.confirming_set_batch_time);
ASSERT_EQ (conf.node.enable_voting, defaults.node.enable_voting);
ASSERT_EQ (conf.node.external_address, defaults.node.external_address);
ASSERT_EQ (conf.node.external_port, defaults.node.external_port);
ASSERT_EQ (conf.node.io_threads, defaults.node.io_threads);
ASSERT_EQ (conf.node.max_work_generate_multiplier, defaults.node.max_work_generate_multiplier);
ASSERT_EQ (conf.node.network_threads, defaults.node.network_threads);
ASSERT_EQ (conf.node.background_threads, defaults.node.background_threads);
ASSERT_EQ (conf.node.secondary_work_peers, defaults.node.secondary_work_peers);
ASSERT_EQ (conf.node.online_weight_minimum, defaults.node.online_weight_minimum);
ASSERT_EQ (conf.node.representative_vote_weight_minimum, defaults.node.representative_vote_weight_minimum);
ASSERT_EQ (conf.node.rep_crawler_weight_minimum, defaults.node.rep_crawler_weight_minimum);
ASSERT_EQ (conf.node.password_fanout, defaults.node.password_fanout);
ASSERT_EQ (conf.node.peering_port, defaults.node.peering_port);
ASSERT_EQ (conf.node.pow_sleep_interval, defaults.node.pow_sleep_interval);
ASSERT_EQ (conf.node.preconfigured_peers, defaults.node.preconfigured_peers);
ASSERT_EQ (conf.node.preconfigured_representatives, defaults.node.preconfigured_representatives);
ASSERT_EQ (conf.node.receive_minimum, defaults.node.receive_minimum);
ASSERT_EQ (conf.node.signature_checker_threads, defaults.node.signature_checker_threads);
ASSERT_EQ (conf.node.tcp_incoming_connections_max, defaults.node.tcp_incoming_connections_max);
ASSERT_EQ (conf.node.tcp_io_timeout, defaults.node.tcp_io_timeout);
ASSERT_EQ (conf.node.unchecked_cutoff_time, defaults.node.unchecked_cutoff_time);
ASSERT_EQ (conf.node.use_memory_pools, defaults.node.use_memory_pools);
ASSERT_EQ (conf.node.vote_generator_delay, defaults.node.vote_generator_delay);
ASSERT_EQ (conf.node.vote_minimum, defaults.node.vote_minimum);
ASSERT_EQ (conf.node.work_peers, defaults.node.work_peers);
ASSERT_EQ (conf.node.work_threads, defaults.node.work_threads);
ASSERT_EQ (conf.node.max_queued_requests, defaults.node.max_queued_requests);
ASSERT_EQ (conf.node.request_aggregator_threads, defaults.node.request_aggregator_threads);
ASSERT_EQ (conf.node.max_unchecked_blocks, defaults.node.max_unchecked_blocks);
ASSERT_EQ (conf.node.max_backlog, defaults.node.max_backlog);
ASSERT_EQ (conf.node.enable_upnp, defaults.node.enable_upnp);
ASSERT_EQ (conf.node.backlog_scan.enable, defaults.node.backlog_scan.enable);
ASSERT_EQ (conf.node.backlog_scan.batch_size, defaults.node.backlog_scan.batch_size);
ASSERT_EQ (conf.node.backlog_scan.rate_limit, defaults.node.backlog_scan.rate_limit);
ASSERT_EQ (conf.node.bounded_backlog.enable, defaults.node.bounded_backlog.enable);
ASSERT_EQ (conf.node.bounded_backlog.batch_size, defaults.node.bounded_backlog.batch_size);
ASSERT_EQ (conf.node.bounded_backlog.max_queued_notifications, defaults.node.bounded_backlog.max_queued_notifications);
ASSERT_EQ (conf.node.bounded_backlog.scan_rate, defaults.node.bounded_backlog.scan_rate);
ASSERT_EQ (conf.node.websocket_config.enabled, defaults.node.websocket_config.enabled);
ASSERT_EQ (conf.node.websocket_config.address, defaults.node.websocket_config.address);
ASSERT_EQ (conf.node.websocket_config.port, defaults.node.websocket_config.port);
ASSERT_EQ (conf.node.callback_address, defaults.node.callback_address);
ASSERT_EQ (conf.node.callback_port, defaults.node.callback_port);
ASSERT_EQ (conf.node.callback_target, defaults.node.callback_target);
ASSERT_EQ (conf.node.ipc_config.transport_domain.allow_unsafe, defaults.node.ipc_config.transport_domain.allow_unsafe);
ASSERT_EQ (conf.node.ipc_config.transport_domain.enabled, defaults.node.ipc_config.transport_domain.enabled);
ASSERT_EQ (conf.node.ipc_config.transport_domain.io_timeout, defaults.node.ipc_config.transport_domain.io_timeout);
ASSERT_EQ (conf.node.ipc_config.transport_domain.io_threads, defaults.node.ipc_config.transport_domain.io_threads);
ASSERT_EQ (conf.node.ipc_config.transport_domain.path, defaults.node.ipc_config.transport_domain.path);
ASSERT_EQ (conf.node.ipc_config.transport_tcp.enabled, defaults.node.ipc_config.transport_tcp.enabled);
ASSERT_EQ (conf.node.ipc_config.transport_tcp.io_timeout, defaults.node.ipc_config.transport_tcp.io_timeout);
ASSERT_EQ (conf.node.ipc_config.transport_tcp.io_threads, defaults.node.ipc_config.transport_tcp.io_threads);
ASSERT_EQ (conf.node.ipc_config.transport_tcp.port, defaults.node.ipc_config.transport_tcp.port);
ASSERT_EQ (conf.node.ipc_config.flatbuffers.skip_unexpected_fields_in_json, defaults.node.ipc_config.flatbuffers.skip_unexpected_fields_in_json);
ASSERT_EQ (conf.node.ipc_config.flatbuffers.verify_buffers, defaults.node.ipc_config.flatbuffers.verify_buffers);
ASSERT_EQ (conf.node.diagnostics_config.txn_tracking.enable, defaults.node.diagnostics_config.txn_tracking.enable);
ASSERT_EQ (conf.node.diagnostics_config.txn_tracking.ignore_writes_below_block_processor_max_time, defaults.node.diagnostics_config.txn_tracking.ignore_writes_below_block_processor_max_time);
ASSERT_EQ (conf.node.diagnostics_config.txn_tracking.min_read_txn_time, defaults.node.diagnostics_config.txn_tracking.min_read_txn_time);
ASSERT_EQ (conf.node.diagnostics_config.txn_tracking.min_write_txn_time, defaults.node.diagnostics_config.txn_tracking.min_write_txn_time);
ASSERT_EQ (conf.node.stats_config.max_samples, defaults.node.stats_config.max_samples);
ASSERT_EQ (conf.node.stats_config.log_rotation_count, defaults.node.stats_config.log_rotation_count);
ASSERT_EQ (conf.node.stats_config.log_samples_interval, defaults.node.stats_config.log_samples_interval);
ASSERT_EQ (conf.node.stats_config.log_counters_interval, defaults.node.stats_config.log_counters_interval);
ASSERT_EQ (conf.node.stats_config.log_headers, defaults.node.stats_config.log_headers);
ASSERT_EQ (conf.node.stats_config.log_counters_filename, defaults.node.stats_config.log_counters_filename);
ASSERT_EQ (conf.node.stats_config.log_samples_filename, defaults.node.stats_config.log_samples_filename);
ASSERT_EQ (conf.node.lmdb_config.sync, defaults.node.lmdb_config.sync);
ASSERT_EQ (conf.node.lmdb_config.max_databases, defaults.node.lmdb_config.max_databases);
ASSERT_EQ (conf.node.lmdb_config.map_size, defaults.node.lmdb_config.map_size);
ASSERT_EQ (conf.node.rocksdb_config.enable, defaults.node.rocksdb_config.enable);
ASSERT_EQ (conf.node.rocksdb_config.io_threads, defaults.node.rocksdb_config.io_threads);
ASSERT_EQ (conf.node.rocksdb_config.read_cache, defaults.node.rocksdb_config.read_cache);
ASSERT_EQ (conf.node.rocksdb_config.write_cache, defaults.node.rocksdb_config.write_cache);
ASSERT_EQ (conf.node.optimistic_scheduler.enable, defaults.node.optimistic_scheduler.enable);
ASSERT_EQ (conf.node.optimistic_scheduler.gap_threshold, defaults.node.optimistic_scheduler.gap_threshold);
ASSERT_EQ (conf.node.optimistic_scheduler.max_size, defaults.node.optimistic_scheduler.max_size);
ASSERT_EQ (conf.node.hinted_scheduler.enable, defaults.node.hinted_scheduler.enable);
ASSERT_EQ (conf.node.hinted_scheduler.hinting_threshold_percent, defaults.node.hinted_scheduler.hinting_threshold_percent);
ASSERT_EQ (conf.node.hinted_scheduler.check_interval.count (), defaults.node.hinted_scheduler.check_interval.count ());
ASSERT_EQ (conf.node.hinted_scheduler.block_cooldown.count (), defaults.node.hinted_scheduler.block_cooldown.count ());
ASSERT_EQ (conf.node.hinted_scheduler.vacancy_threshold_percent, defaults.node.hinted_scheduler.vacancy_threshold_percent);
ASSERT_EQ (conf.node.vote_cache.max_size, defaults.node.vote_cache.max_size);
ASSERT_EQ (conf.node.vote_cache.max_voters, defaults.node.vote_cache.max_voters);
ASSERT_EQ (conf.node.block_processor.max_peer_queue, defaults.node.block_processor.max_peer_queue);
ASSERT_EQ (conf.node.block_processor.max_system_queue, defaults.node.block_processor.max_system_queue);
ASSERT_EQ (conf.node.block_processor.priority_live, defaults.node.block_processor.priority_live);
ASSERT_EQ (conf.node.block_processor.priority_bootstrap, defaults.node.block_processor.priority_bootstrap);
ASSERT_EQ (conf.node.block_processor.priority_local, defaults.node.block_processor.priority_local);
ASSERT_EQ (conf.node.vote_processor.max_pr_queue, defaults.node.vote_processor.max_pr_queue);
ASSERT_EQ (conf.node.vote_processor.max_non_pr_queue, defaults.node.vote_processor.max_non_pr_queue);
ASSERT_EQ (conf.node.vote_processor.pr_priority, defaults.node.vote_processor.pr_priority);
ASSERT_EQ (conf.node.vote_processor.threads, defaults.node.vote_processor.threads);
ASSERT_EQ (conf.node.vote_processor.batch_size, defaults.node.vote_processor.batch_size);
ASSERT_EQ (conf.node.bootstrap.enable, defaults.node.bootstrap.enable);
ASSERT_EQ (conf.node.bootstrap.enable_database_scan, defaults.node.bootstrap.enable_database_scan);
ASSERT_EQ (conf.node.bootstrap.enable_dependency_walker, defaults.node.bootstrap.enable_dependency_walker);
ASSERT_EQ (conf.node.bootstrap.channel_limit, defaults.node.bootstrap.channel_limit);
ASSERT_EQ (conf.node.bootstrap.database_rate_limit, defaults.node.bootstrap.database_rate_limit);
ASSERT_EQ (conf.node.bootstrap.database_warmup_ratio, defaults.node.bootstrap.database_warmup_ratio);
ASSERT_EQ (conf.node.bootstrap.max_pull_count, defaults.node.bootstrap.max_pull_count);
ASSERT_EQ (conf.node.bootstrap.request_timeout, defaults.node.bootstrap.request_timeout);
ASSERT_EQ (conf.node.bootstrap.throttle_coefficient, defaults.node.bootstrap.throttle_coefficient);
ASSERT_EQ (conf.node.bootstrap.throttle_wait, defaults.node.bootstrap.throttle_wait);
ASSERT_EQ (conf.node.bootstrap.block_processor_threshold, defaults.node.bootstrap.block_processor_threshold);
ASSERT_EQ (conf.node.bootstrap.max_requests, defaults.node.bootstrap.max_requests);
ASSERT_EQ (conf.node.bootstrap_server.max_queue, defaults.node.bootstrap_server.max_queue);
ASSERT_EQ (conf.node.bootstrap_server.threads, defaults.node.bootstrap_server.threads);
ASSERT_EQ (conf.node.bootstrap_server.batch_size, defaults.node.bootstrap_server.batch_size);
ASSERT_EQ (conf.node.request_aggregator.max_queue, defaults.node.request_aggregator.max_queue);
ASSERT_EQ (conf.node.request_aggregator.threads, defaults.node.request_aggregator.threads);
ASSERT_EQ (conf.node.request_aggregator.batch_size, defaults.node.request_aggregator.batch_size);
ASSERT_EQ (conf.node.message_processor.threads, defaults.node.message_processor.threads);
ASSERT_EQ (conf.node.message_processor.max_queue, defaults.node.message_processor.max_queue);
}
/** Deserialize a node config with non-default values */
TEST (toml, daemon_config_deserialize_no_defaults)
TEST (toml_config, daemon_config_deserialize_no_defaults)
{
std::stringstream ss;
@ -462,6 +470,7 @@ TEST (toml, daemon_config_deserialize_no_defaults)
max_queued_requests = 999
request_aggregator_threads = 999
max_unchecked_blocks = 999
max_backlog = 999
frontiers_confirmation = "always"
enable_upnp = false
@ -470,6 +479,12 @@ TEST (toml, daemon_config_deserialize_no_defaults)
batch_size = 999
rate_limit = 999
[node.bounded_backlog]
enable = false
batch_size = 999
max_queued_notifications = 999
scan_rate = 999
[node.block_processor]
max_peer_queue = 999
max_system_queue = 999
@ -679,6 +694,7 @@ TEST (toml, daemon_config_deserialize_no_defaults)
ASSERT_NE (conf.node.io_threads, defaults.node.io_threads);
ASSERT_NE (conf.node.max_work_generate_multiplier, defaults.node.max_work_generate_multiplier);
ASSERT_NE (conf.node.max_unchecked_blocks, defaults.node.max_unchecked_blocks);
ASSERT_NE (conf.node.max_backlog, defaults.node.max_backlog);
ASSERT_NE (conf.node.network_threads, defaults.node.network_threads);
ASSERT_NE (conf.node.background_threads, defaults.node.background_threads);
ASSERT_NE (conf.node.secondary_work_peers, defaults.node.secondary_work_peers);
@ -704,10 +720,16 @@ TEST (toml, daemon_config_deserialize_no_defaults)
ASSERT_NE (conf.node.work_threads, defaults.node.work_threads);
ASSERT_NE (conf.node.max_queued_requests, defaults.node.max_queued_requests);
ASSERT_NE (conf.node.request_aggregator_threads, defaults.node.request_aggregator_threads);
ASSERT_NE (conf.node.enable_upnp, defaults.node.enable_upnp);
ASSERT_NE (conf.node.backlog_scan.enable, defaults.node.backlog_scan.enable);
ASSERT_NE (conf.node.backlog_scan.batch_size, defaults.node.backlog_scan.batch_size);
ASSERT_NE (conf.node.backlog_scan.rate_limit, defaults.node.backlog_scan.rate_limit);
ASSERT_NE (conf.node.enable_upnp, defaults.node.enable_upnp);
ASSERT_NE (conf.node.bounded_backlog.enable, defaults.node.bounded_backlog.enable);
ASSERT_NE (conf.node.bounded_backlog.batch_size, defaults.node.bounded_backlog.batch_size);
ASSERT_NE (conf.node.bounded_backlog.max_queued_notifications, defaults.node.bounded_backlog.max_queued_notifications);
ASSERT_NE (conf.node.bounded_backlog.scan_rate, defaults.node.bounded_backlog.scan_rate);
ASSERT_NE (conf.node.websocket_config.enabled, defaults.node.websocket_config.enabled);
ASSERT_NE (conf.node.websocket_config.address, defaults.node.websocket_config.address);
@ -804,7 +826,7 @@ TEST (toml, daemon_config_deserialize_no_defaults)
}
/** There should be no required values **/
TEST (toml, daemon_config_no_required)
TEST (toml_config, daemon_config_no_required)
{
std::stringstream ss;
@ -835,7 +857,7 @@ TEST (toml, daemon_config_no_required)
}
/** Deserialize an rpc config with non-default values */
TEST (toml, rpc_config_deserialize_no_defaults)
TEST (toml_config, rpc_config_deserialize_no_defaults)
{
std::stringstream ss;
@ -878,7 +900,7 @@ TEST (toml, rpc_config_deserialize_no_defaults)
}
/** There should be no required values **/
TEST (toml, rpc_config_no_required)
TEST (toml_config, rpc_config_no_required)
{
std::stringstream ss;
@ -900,7 +922,7 @@ TEST (toml, rpc_config_no_required)
}
/** Deserialize a node config with incorrect values */
TEST (toml, daemon_config_deserialize_errors)
TEST (toml_config, daemon_config_deserialize_errors)
{
{
std::stringstream ss;
@ -932,7 +954,7 @@ TEST (toml, daemon_config_deserialize_errors)
}
}
TEST (toml, daemon_read_config)
TEST (toml_config, daemon_read_config)
{
auto path (nano::unique_path ());
std::filesystem::create_directories (path);
@ -976,7 +998,7 @@ TEST (toml, daemon_read_config)
}
}
TEST (toml, log_config_defaults)
TEST (toml_config, log_config_defaults)
{
std::stringstream ss;
@ -1002,7 +1024,7 @@ TEST (toml, log_config_defaults)
ASSERT_EQ (confg.file.rotation_count, defaults.file.rotation_count);
}
TEST (toml, log_config_no_defaults)
TEST (toml_config, log_config_no_defaults)
{
std::stringstream ss;
@ -1044,7 +1066,7 @@ TEST (toml, log_config_no_defaults)
ASSERT_NE (confg.file.rotation_count, defaults.file.rotation_count);
}
TEST (toml, log_config_no_required)
TEST (toml_config, log_config_no_required)
{
std::stringstream ss;
@ -1065,7 +1087,7 @@ TEST (toml, log_config_no_required)
ASSERT_FALSE (toml.get_error ()) << toml.get_error ().get_message ();
}
TEST (toml, merge_config_files)
TEST (toml_config, merge_config_files)
{
nano::network_params network_params{ nano::network_constants::active_network };
nano::tomlconfig default_toml;

View file

@ -83,6 +83,7 @@ enum class type
local_block_broadcaster,
monitor,
confirming_set,
bounded_backlog,
// bootstrap
bulk_pull_client,

View file

@ -85,6 +85,7 @@ enum class type
active_elections_cancelled,
active_elections_cemented,
backlog_scan,
bounded_backlog,
backlog,
unchecked,
election_scheduler,
@ -203,6 +204,7 @@ enum class detail
unchecked,
local,
forced,
election,
// message specific
not_a_type,
@ -569,6 +571,14 @@ enum class detail
blocks_by_account,
account_info_by_hash,
// bounded backlog,
gathered_targets,
performing_rollbacks,
no_targets,
rollback_missing_block,
rollback_skipped,
loop_scan,
_last // Must be the last enum
};

View file

@ -100,6 +100,15 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::backlog_scan:
thread_role_name_string = "Backlog scan";
break;
case nano::thread_role::name::bounded_backlog:
thread_role_name_string = "Bounded backlog";
break;
case nano::thread_role::name::bounded_backlog_scan:
thread_role_name_string = "Bounded b scan";
break;
case nano::thread_role::name::bounded_backlog_notifications:
thread_role_name_string = "Bounded b notif";
break;
case nano::thread_role::name::vote_generator_queue:
thread_role_name_string = "Voting que";
break;

View file

@ -38,6 +38,9 @@ enum class name
db_parallel_traversal,
unchecked,
backlog_scan,
bounded_backlog,
bounded_backlog_scan,
bounded_backlog_notifications,
vote_generator_queue,
telemetry,
bootstrap,

View file

@ -24,6 +24,8 @@ add_library(
blockprocessor.cpp
bucketing.hpp
bucketing.cpp
bounded_backlog.hpp
bounded_backlog.cpp
bootstrap_weights_beta.hpp
bootstrap_weights_live.hpp
bootstrap/account_sets.hpp

View file

@ -51,7 +51,7 @@ nano::block_processor::block_processor (nano::node_config const & node_config, n
case nano::block_source::local:
return config.priority_local;
default:
return 1;
return config.priority_system;
}
};

View file

@ -24,6 +24,7 @@ enum class block_source
unchecked,
local,
forced,
election,
};
std::string_view to_string (block_source);
@ -47,6 +48,7 @@ public:
size_t priority_live{ 1 };
size_t priority_bootstrap{ 8 };
size_t priority_local{ 16 };
size_t priority_system{ 32 };
size_t batch_size{ 256 };
size_t max_queued_notifications{ 8 };

View file

@ -50,6 +50,16 @@ nano::bootstrap_service::bootstrap_service (nano::node_config const & node_confi
condition.notify_all ();
});
// Unblock rolled back accounts as the dependency is no longer valid
block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
nano::lock_guard<nano::mutex> lock{ mutex };
for (auto const & block : blocks)
{
debug_assert (block != nullptr);
accounts.unblock (block->account ());
}
});
accounts.priority_set (node_config_a.network_params.ledger.genesis->account_field ().value ());
}

View file

@ -0,0 +1,564 @@
#include <nano/lib/blocks.hpp>
#include <nano/lib/thread_roles.hpp>
#include <nano/node/backlog_scan.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/bounded_backlog.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/node.hpp>
#include <nano/node/scheduler/component.hpp>
#include <nano/secure/common.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/secure/ledger_set_any.hpp>
#include <nano/secure/ledger_set_confirmed.hpp>
#include <nano/secure/transaction.hpp>
nano::bounded_backlog::bounded_backlog (nano::node_config const & config_a, nano::node & node_a, nano::ledger & ledger_a, nano::bucketing & bucketing_a, nano::backlog_scan & backlog_scan_a, nano::block_processor & block_processor_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a) :
config{ config_a },
node{ node_a },
ledger{ ledger_a },
bucketing{ bucketing_a },
backlog_scan{ backlog_scan_a },
block_processor{ block_processor_a },
confirming_set{ confirming_set_a },
stats{ stats_a },
logger{ logger_a },
scan_limiter{ config.bounded_backlog.scan_rate },
workers{ 1, nano::thread_role::name::bounded_backlog_notifications }
{
// Activate accounts with unconfirmed blocks
backlog_scan.batch_activated.add ([this] (auto const & batch) {
auto transaction = ledger.tx_begin_read ();
for (auto const & info : batch)
{
activate (transaction, info.account, info.account_info, info.conf_info);
}
});
// Erase accounts with all confirmed blocks
backlog_scan.batch_scanned.add ([this] (auto const & batch) {
nano::lock_guard<nano::mutex> guard{ mutex };
for (auto const & info : batch)
{
if (info.conf_info.height == info.account_info.block_count)
{
index.erase (info.account);
}
}
});
// Track unconfirmed blocks
block_processor.batch_processed.add ([this] (auto const & batch) {
auto transaction = ledger.tx_begin_read ();
for (auto const & [result, context] : batch)
{
if (result == nano::block_status::progress)
{
auto const & block = context.block;
insert (transaction, *block);
}
}
});
// Remove rolled back blocks from the backlog
block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
nano::lock_guard<nano::mutex> guard{ mutex };
for (auto const & block : blocks)
{
index.erase (block->hash ());
}
});
// Remove cemented blocks from the backlog
confirming_set.batch_cemented.add ([this] (auto const & batch) {
nano::lock_guard<nano::mutex> guard{ mutex };
for (auto const & context : batch)
{
index.erase (context.block->hash ());
}
});
}
nano::bounded_backlog::~bounded_backlog ()
{
// Thread must be stopped before destruction
debug_assert (!thread.joinable ());
debug_assert (!scan_thread.joinable ());
debug_assert (!workers.alive ());
}
void nano::bounded_backlog::start ()
{
debug_assert (!thread.joinable ());
if (!config.bounded_backlog.enable)
{
return;
}
workers.start ();
thread = std::thread{ [this] () {
nano::thread_role::set (nano::thread_role::name::bounded_backlog);
run ();
} };
scan_thread = std::thread{ [this] () {
nano::thread_role::set (nano::thread_role::name::bounded_backlog_scan);
run_scan ();
} };
}
void nano::bounded_backlog::stop ()
{
{
nano::lock_guard<nano::mutex> lock{ mutex };
stopped = true;
}
condition.notify_all ();
if (thread.joinable ())
{
thread.join ();
}
if (scan_thread.joinable ())
{
scan_thread.join ();
}
workers.stop ();
}
size_t nano::bounded_backlog::index_size () const
{
nano::lock_guard<nano::mutex> guard{ mutex };
return index.size ();
}
void nano::bounded_backlog::activate (nano::secure::transaction & transaction, nano::account const & account, nano::account_info const & account_info, nano::confirmation_height_info const & conf_info)
{
debug_assert (conf_info.frontier != account_info.head);
// Insert blocks into the index starting from the account head block
auto block = ledger.any.block_get (transaction, account_info.head);
while (block)
{
// We reached the confirmed frontier, no need to track more blocks
if (block->hash () == conf_info.frontier)
{
break;
}
// Check if the block is already in the backlog, avoids unnecessary ledger lookups
if (contains (block->hash ()))
{
break;
}
bool inserted = insert (transaction, *block);
// If the block was not inserted, we already have it in the backlog
if (!inserted)
{
break;
}
transaction.refresh_if_needed ();
block = ledger.any.block_get (transaction, block->previous ());
}
}
void nano::bounded_backlog::update (nano::secure::transaction const & transaction, nano::block_hash const & hash)
{
// Erase if the block is either confirmed or missing
if (!ledger.unconfirmed_exists (transaction, hash))
{
nano::lock_guard<nano::mutex> guard{ mutex };
index.erase (hash);
}
}
bool nano::bounded_backlog::insert (nano::secure::transaction const & transaction, nano::block const & block)
{
auto const [priority_balance, priority_timestamp] = ledger.block_priority (transaction, block);
auto const bucket_index = bucketing.bucket_index (priority_balance);
nano::lock_guard<nano::mutex> guard{ mutex };
return index.insert (block, bucket_index, priority_timestamp);
}
bool nano::bounded_backlog::predicate () const
{
debug_assert (!mutex.try_lock ());
// Both ledger and tracked backlog must be over the threshold
return ledger.backlog_count () > config.max_backlog && index.size () > config.max_backlog;
}
void nano::bounded_backlog::run ()
{
std::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
condition.wait_for (lock, 1s, [this] {
return stopped || predicate ();
});
if (stopped)
{
return;
}
// Wait until all notification about the previous rollbacks are processed
while (workers.queued_tasks () >= config.bounded_backlog.max_queued_notifications)
{
stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::cooldown);
condition.wait_for (lock, 100ms, [this] { return stopped.load (); });
if (stopped)
{
return;
}
}
stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::loop);
// Calculate the number of targets to rollback
uint64_t const backlog = ledger.backlog_count ();
uint64_t const target_count = backlog > config.max_backlog ? backlog - config.max_backlog : 0;
auto targets = gather_targets (std::min (target_count, static_cast<uint64_t> (config.bounded_backlog.batch_size)));
if (!targets.empty ())
{
lock.unlock ();
stats.add (nano::stat::type::bounded_backlog, nano::stat::detail::gathered_targets, targets.size ());
auto processed = perform_rollbacks (targets, target_count);
lock.lock ();
// Erase rolled back blocks from the index
for (auto const & hash : processed)
{
index.erase (hash);
}
}
else
{
// Cooldown, this should not happen in normal operation
stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::no_targets);
condition.wait_for (lock, 100ms, [this] {
return stopped.load ();
});
}
}
}
bool nano::bounded_backlog::should_rollback (nano::block_hash const & hash) const
{
if (node.vote_cache.contains (hash))
{
return false;
}
if (node.vote_router.contains (hash))
{
return false;
}
if (node.active.recently_confirmed.exists (hash))
{
return false;
}
if (node.scheduler.contains (hash))
{
return false;
}
if (node.confirming_set.contains (hash))
{
return false;
}
if (node.local_block_broadcaster.contains (hash))
{
return false;
}
return true;
}
std::deque<nano::block_hash> nano::bounded_backlog::perform_rollbacks (std::deque<nano::block_hash> const & targets, size_t max_rollbacks)
{
stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::performing_rollbacks);
auto transaction = ledger.tx_begin_write (nano::store::writer::bounded_backlog);
std::deque<nano::block_hash> processed;
for (auto const & hash : targets)
{
// Skip the rollback if the block is being used by the node, this should be race free as it's checked while holding the ledger write lock
if (!should_rollback (hash))
{
stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::rollback_skipped);
continue;
}
// Here we check that the block is still OK to rollback, there could be a delay between gathering the targets and performing the rollbacks
if (auto block = ledger.any.block_get (transaction, hash))
{
logger.debug (nano::log::type::bounded_backlog, "Rolling back: {}, account: {}", hash.to_string (), block->account ().to_account ());
std::deque<std::shared_ptr<nano::block>> rollback_list;
bool error = ledger.rollback (transaction, hash, rollback_list);
stats.inc (nano::stat::type::bounded_backlog, error ? nano::stat::detail::rollback_failed : nano::stat::detail::rollback);
for (auto const & rollback : rollback_list)
{
processed.push_back (rollback->hash ());
}
// Notify observers of the rolled back blocks on a background thread, avoid dispatching notifications when holding ledger write transaction
workers.post ([this, rollback_list = std::move (rollback_list), root = block->qualified_root ()] {
// TODO: Calling block_processor's event here is not ideal, but duplicating these events is even worse
block_processor.rolled_back.notify (rollback_list, root);
});
// Return early if we reached the maximum number of rollbacks
if (processed.size () >= max_rollbacks)
{
break;
}
}
else
{
stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::rollback_missing_block);
processed.push_back (hash);
}
}
return processed;
}
size_t nano::bounded_backlog::bucket_threshold () const
{
return config.max_backlog / bucketing.size ();
}
std::deque<nano::block_hash> nano::bounded_backlog::gather_targets (size_t max_count) const
{
debug_assert (!mutex.try_lock ());
std::deque<nano::block_hash> targets;
// Start rolling back from lowest index buckets first
for (auto bucket : bucketing.bucket_indices ())
{
// Only start rolling back if the bucket is over the threshold of unconfirmed blocks
if (index.size (bucket) > bucket_threshold ())
{
auto const count = std::min (max_count, config.bounded_backlog.batch_size);
auto const top = index.top (bucket, count, [this] (auto const & hash) {
// Only rollback if the block is not being used by the node
return should_rollback (hash);
});
for (auto const & entry : top)
{
targets.push_back (entry);
}
}
}
return targets;
}
void nano::bounded_backlog::run_scan ()
{
std::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
auto wait = [&] (auto count) {
while (!scan_limiter.should_pass (count))
{
condition.wait_for (lock, 100ms);
if (stopped)
{
return;
}
}
};
nano::block_hash last = 0;
while (!stopped)
{
wait (config.bounded_backlog.batch_size);
stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::loop_scan);
auto batch = index.next (last, config.bounded_backlog.batch_size);
if (batch.empty ()) // If batch is empty, we iterated over all accounts in the index
{
break;
}
lock.unlock ();
{
auto transaction = ledger.tx_begin_read ();
for (auto const & hash : batch)
{
stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::scanned);
update (transaction, hash);
last = hash;
}
}
lock.lock ();
}
}
}
bool nano::bounded_backlog::contains (nano::block_hash const & hash) const
{
nano::lock_guard<nano::mutex> guard{ mutex };
return index.contains (hash);
}
nano::container_info nano::bounded_backlog::container_info () const
{
nano::lock_guard<nano::mutex> guard{ mutex };
nano::container_info info;
info.put ("backlog", index.size ());
info.put ("notifications", workers.queued_tasks ());
info.add ("index", index.container_info ());
return info;
}
/*
* backlog_index
*/
bool nano::backlog_index::insert (nano::block const & block, nano::bucket_index bucket, nano::priority_timestamp priority)
{
auto const hash = block.hash ();
auto const account = block.account ();
entry new_entry{
.hash = hash,
.account = account,
.bucket = bucket,
.priority = priority,
};
auto [it, inserted] = blocks.emplace (new_entry);
if (inserted)
{
size_by_bucket[bucket]++;
return true;
}
return false;
}
bool nano::backlog_index::erase (nano::account const & account)
{
auto const [begin, end] = blocks.get<tag_account> ().equal_range (account);
for (auto it = begin; it != end;)
{
size_by_bucket[it->bucket]--;
it = blocks.get<tag_account> ().erase (it);
}
return begin != end;
}
bool nano::backlog_index::erase (nano::block_hash const & hash)
{
if (auto existing = blocks.get<tag_hash> ().find (hash); existing != blocks.get<tag_hash> ().end ())
{
size_by_bucket[existing->bucket]--;
blocks.get<tag_hash> ().erase (existing);
return true;
}
return false;
}
std::deque<nano::block_hash> nano::backlog_index::top (nano::bucket_index bucket, size_t count, filter_callback const & filter) const
{
// Highest timestamp, lowest priority, iterate in descending order
priority_key const starting_key{ bucket, std::numeric_limits<nano::priority_timestamp>::max () };
std::deque<nano::block_hash> results;
auto begin = blocks.get<tag_priority> ().lower_bound (starting_key);
for (auto it = begin; it != blocks.get<tag_priority> ().end () && it->bucket == bucket && results.size () < count; ++it)
{
if (filter (it->hash))
{
results.push_back (it->hash);
}
}
return results;
}
std::deque<nano::block_hash> nano::backlog_index::next (nano::block_hash last, size_t count) const
{
std::deque<block_hash> results;
auto it = blocks.get<tag_hash_ordered> ().upper_bound (last);
auto end = blocks.get<tag_hash_ordered> ().end ();
for (; it != end && results.size () < count; ++it)
{
results.push_back (it->hash);
}
return results;
}
bool nano::backlog_index::contains (nano::block_hash const & hash) const
{
return blocks.get<tag_hash> ().contains (hash);
}
size_t nano::backlog_index::size () const
{
return blocks.size ();
}
size_t nano::backlog_index::size (nano::bucket_index bucket) const
{
if (auto it = size_by_bucket.find (bucket); it != size_by_bucket.end ())
{
return it->second;
}
return 0;
}
nano::container_info nano::backlog_index::container_info () const
{
auto collect_bucket_sizes = [&] () {
nano::container_info info;
for (auto [bucket, count] : size_by_bucket)
{
info.put (std::to_string (bucket), count);
}
return info;
};
nano::container_info info;
info.put ("blocks", blocks);
info.add ("sizes", collect_bucket_sizes ());
return info;
}
/*
* bounded_backlog_config
*/
nano::error nano::bounded_backlog_config::serialize (nano::tomlconfig & toml) const
{
toml.put ("enable", enable, "Enable the bounded backlog. \ntype:bool");
toml.put ("batch_size", batch_size, "Maximum number of blocks to rollback per iteration. \ntype:uint64");
toml.put ("max_queued_notifications", max_queued_notifications, "Maximum number of queued background tasks before cooldown. \ntype:uint64");
toml.put ("scan_rate", scan_rate, "Rate limit for refreshing the backlog index. \ntype:uint64");
return toml.get_error ();
}
nano::error nano::bounded_backlog_config::deserialize (nano::tomlconfig & toml)
{
toml.get ("enable", enable);
toml.get ("batch_size", batch_size);
toml.get ("max_queued_notifications", max_queued_notifications);
toml.get ("scan_rate", scan_rate);
return toml.get_error ();
}

View file

@ -0,0 +1,161 @@
#pragma once
#include <nano/lib/locks.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/lib/numbers_templ.hpp>
#include <nano/lib/observer_set.hpp>
#include <nano/lib/rate_limiting.hpp>
#include <nano/lib/thread_pool.hpp>
#include <nano/node/bucketing.hpp>
#include <nano/node/fwd.hpp>
#include <nano/secure/common.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/mem_fun.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <boost/multi_index_container.hpp>
#include <unordered_map>
namespace mi = boost::multi_index;
namespace nano
{
class backlog_index
{
public:
struct priority_key
{
nano::bucket_index bucket;
nano::priority_timestamp priority;
auto operator<=> (priority_key const &) const = default;
};
struct entry
{
nano::block_hash hash;
nano::account account;
nano::bucket_index bucket;
nano::priority_timestamp priority;
backlog_index::priority_key priority_key () const
{
return { bucket, priority };
}
};
public:
backlog_index () = default;
bool insert (nano::block const & block, nano::bucket_index, nano::priority_timestamp);
bool erase (nano::account const & account);
bool erase (nano::block_hash const & hash);
using filter_callback = std::function<bool (nano::block_hash const &)>;
std::deque<nano::block_hash> top (nano::bucket_index, size_t count, filter_callback const &) const;
std::deque<nano::block_hash> next (nano::block_hash last, size_t count) const;
bool contains (nano::block_hash const & hash) const;
size_t size () const;
size_t size (nano::bucket_index) const;
nano::container_info container_info () const;
private:
// clang-format off
class tag_hash {};
class tag_hash_ordered {};
class tag_account {};
class tag_priority {};
using ordered_blocks = boost::multi_index_container<entry,
mi::indexed_by<
mi::hashed_unique<mi::tag<tag_hash>, // Allows for fast lookup
mi::member<entry, nano::block_hash, &entry::hash>>,
mi::ordered_unique<mi::tag<tag_hash_ordered>, // Allows for sequential scan
mi::member<entry, nano::block_hash, &entry::hash>>,
mi::hashed_non_unique<mi::tag<tag_account>,
mi::member<entry, nano::account, &entry::account>>,
mi::ordered_non_unique<mi::tag<tag_priority>,
mi::const_mem_fun<entry, priority_key, &entry::priority_key>, std::greater<>> // DESC order
>>;
// clang-format on
ordered_blocks blocks;
// Keep track of the size of the backlog in number of unconfirmed blocks per bucket
std::unordered_map<nano::bucket_index, size_t> size_by_bucket;
};
class bounded_backlog_config
{
public:
nano::error deserialize (nano::tomlconfig &);
nano::error serialize (nano::tomlconfig &) const;
public:
bool enable{ true };
size_t batch_size{ 32 };
size_t max_queued_notifications{ 128 };
size_t scan_rate{ 64 };
};
class bounded_backlog
{
public:
bounded_backlog (nano::node_config const &, nano::node &, nano::ledger &, nano::bucketing &, nano::backlog_scan &, nano::block_processor &, nano::confirming_set &, nano::stats &, nano::logger &);
~bounded_backlog ();
void start ();
void stop ();
size_t index_size () const;
size_t bucket_threshold () const;
bool contains (nano::block_hash const &) const;
nano::container_info container_info () const;
private: // Dependencies
nano::node_config const & config;
nano::node & node;
nano::ledger & ledger;
nano::bucketing & bucketing;
nano::backlog_scan & backlog_scan;
nano::block_processor & block_processor;
nano::confirming_set & confirming_set;
nano::stats & stats;
nano::logger & logger;
private:
void activate (nano::secure::transaction &, nano::account const &, nano::account_info const &, nano::confirmation_height_info const &);
void update (nano::secure::transaction const &, nano::block_hash const &);
bool insert (nano::secure::transaction const &, nano::block const &);
bool predicate () const;
void run ();
std::deque<nano::block_hash> gather_targets (size_t max_count) const;
bool should_rollback (nano::block_hash const &) const;
std::deque<nano::block_hash> perform_rollbacks (std::deque<nano::block_hash> const & targets, size_t max_rollbacks);
void run_scan ();
private:
nano::backlog_index index;
nano::rate_limiter scan_limiter;
std::atomic<bool> stopped{ false };
nano::condition_variable condition;
mutable nano::mutex mutex;
std::thread thread;
std::thread scan_thread;
nano::thread_pool workers;
};
}

View file

@ -399,7 +399,7 @@ void nano::election::confirm_if_quorum (nano::unique_lock<nano::mutex> & lock_a)
{
debug_assert (lock_a.owns_lock ());
auto tally_l (tally_impl ());
debug_assert (!tally_l.empty ());
release_assert (!tally_l.empty ());
auto winner (tally_l.begin ());
auto block_l (winner->second);
auto const & winner_hash_l (block_l->hash ());
@ -425,6 +425,8 @@ void nano::election::confirm_if_quorum (nano::unique_lock<nano::mutex> & lock_a)
}
if (final_weight >= node.online_reps.delta ())
{
// In some edge cases block might get rolled back while the election is confirming, reprocess it to ensure it's present in the ledger
node.block_processor.add (block_l, nano::block_source::election);
confirm_once (lock_a);
debug_assert (!lock_a.owns_lock ());
}

View file

@ -9,7 +9,9 @@ namespace nano
{
class account_sets_config;
class active_elections;
class backlog_scan;
class block_processor;
class bounded_backlog;
class bucketing;
class bootstrap_config;
class bootstrap_server;

View file

@ -103,6 +103,12 @@ void nano::local_block_broadcaster::stop ()
nano::join_or_pass (thread);
}
bool nano::local_block_broadcaster::contains (nano::block_hash const & hash) const
{
nano::lock_guard<nano::mutex> lock{ mutex };
return local_blocks.get<tag_hash> ().contains (hash);
}
size_t nano::local_block_broadcaster::size () const
{
nano::lock_guard<nano::mutex> lock{ mutex };

View file

@ -59,6 +59,7 @@ public:
void start ();
void stop ();
bool contains (nano::block_hash const &) const;
size_t size () const;
nano::container_info container_info () const;

View file

@ -14,6 +14,7 @@
#include <nano/node/bootstrap/bootstrap_service.hpp>
#include <nano/node/bootstrap_weights_beta.hpp>
#include <nano/node/bootstrap_weights_live.hpp>
#include <nano/node/bounded_backlog.hpp>
#include <nano/node/bucketing.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/daemonconfig.hpp>
@ -159,6 +160,8 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
wallets (wallets_store.init_error (), *this),
backlog_scan_impl{ std::make_unique<nano::backlog_scan> (config.backlog_scan, ledger, stats) },
backlog_scan{ *backlog_scan_impl },
backlog_impl{ std::make_unique<nano::bounded_backlog> (config, *this, ledger, bucketing, backlog_scan, block_processor, confirming_set, stats, logger) },
backlog{ *backlog_impl },
bootstrap_server_impl{ std::make_unique<nano::bootstrap_server> (config.bootstrap_server, store, ledger, network_params.network, stats) },
bootstrap_server{ *bootstrap_server_impl },
bootstrap_impl{ std::make_unique<nano::bootstrap_service> (config, block_processor, ledger, network, stats, logger) },
@ -651,6 +654,7 @@ void nano::node::start ()
scheduler.start ();
aggregator.start ();
backlog_scan.start ();
backlog.start ();
bootstrap_server.start ();
bootstrap.start ();
websocket.start ();
@ -683,6 +687,7 @@ void nano::node::stop ()
distributed_work.stop ();
backlog_scan.stop ();
bootstrap.stop ();
backlog.stop ();
rep_crawler.stop ();
unchecked.stop ();
block_processor.stop ();
@ -1211,6 +1216,7 @@ nano::container_info nano::node::container_info () const
info.add ("message_processor", message_processor.container_info ());
info.add ("bandwidth", outbound_limiter.container_info ());
info.add ("backlog_scan", backlog_scan.container_info ());
info.add ("bounded_backlog", backlog.container_info ());
return info;
}

View file

@ -33,7 +33,6 @@
namespace nano
{
class active_elections;
class backlog_scan;
class bandwidth_limiter;
class confirming_set;
class message_processor;
@ -203,6 +202,8 @@ public:
nano::wallets wallets;
std::unique_ptr<nano::backlog_scan> backlog_scan_impl;
nano::backlog_scan & backlog_scan;
std::unique_ptr<nano::bounded_backlog> backlog_impl;
nano::bounded_backlog & backlog;
std::unique_ptr<nano::bootstrap_server> bootstrap_server_impl;
nano::bootstrap_server & bootstrap_server;
std::unique_ptr<nano::bootstrap_service> bootstrap_impl;

View file

@ -140,6 +140,7 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const
toml.put ("max_queued_requests", max_queued_requests, "Limit for number of queued confirmation requests for one channel, after which new requests are dropped until the queue drops below this value.\ntype:uint32");
toml.put ("request_aggregator_threads", request_aggregator_threads, "Number of threads to dedicate to request aggregator. Defaults to using all cpu threads, up to a maximum of 4");
toml.put ("max_unchecked_blocks", max_unchecked_blocks, "Maximum number of unchecked blocks to store in memory. Defaults to 65536. \ntype:uint64,[0..]");
toml.put ("max_backlog", max_backlog, "Maximum number of unconfirmed blocks to keep in the ledger. If this limit is exceeded, the node will start dropping low-priority unconfirmed blocks.\ntype:uint64");
toml.put ("rep_crawler_weight_minimum", rep_crawler_weight_minimum.to_string_dec (), "Rep crawler minimum weight, if this is less than minimum principal weight then this is taken as the minimum weight a rep must have to be tracked. If you want to track all reps set this to 0. If you do not want this to influence anything then set it to max value. This is only useful for debugging or for people who really know what they are doing.\ntype:string,amount,raw");
toml.put ("enable_upnp", enable_upnp, "Enable or disable automatic UPnP port forwarding. This feature only works if the node is directly connected to a router (not inside a docker container, etc.).\ntype:bool");
@ -262,6 +263,10 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const
backlog_scan.serialize (backlog_scan_l);
toml.put_child ("backlog_scan", backlog_scan_l);
nano::tomlconfig bounded_backlog_l;
bounded_backlog.serialize (bounded_backlog_l);
toml.put_child ("bounded_backlog", bounded_backlog_l);
return toml.get_error ();
}
@ -401,6 +406,12 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml)
backlog_scan.deserialize (config_l);
}
if (toml.has_key ("bounded_backlog"))
{
auto config_l = toml.get_required_child ("bounded_backlog");
bounded_backlog.deserialize (config_l);
}
/*
* Values
*/
@ -552,6 +563,7 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml)
toml.get<uint32_t> ("request_aggregator_threads", request_aggregator_threads);
toml.get<unsigned> ("max_unchecked_blocks", max_unchecked_blocks);
toml.get<std::size_t> ("max_backlog", max_backlog);
auto rep_crawler_weight_minimum_l (rep_crawler_weight_minimum.to_string_dec ());
if (toml.has_key ("rep_crawler_weight_minimum"))

View file

@ -13,6 +13,7 @@
#include <nano/node/blockprocessor.hpp>
#include <nano/node/bootstrap/bootstrap_config.hpp>
#include <nano/node/bootstrap/bootstrap_server.hpp>
#include <nano/node/bounded_backlog.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/ipc/ipc_config.hpp>
#include <nano/node/local_block_broadcaster.hpp>
@ -128,11 +129,14 @@ public:
uint32_t max_queued_requests{ 512 };
unsigned request_aggregator_threads{ std::min (nano::hardware_concurrency (), 4u) }; // Max 4 threads if available
unsigned max_unchecked_blocks{ 65536 };
std::size_t max_backlog{ 100000 };
std::chrono::seconds max_pruning_age{ !network_params.network.is_beta_network () ? std::chrono::seconds (24 * 60 * 60) : std::chrono::seconds (5 * 60) }; // 1 day; 5 minutes for beta network
uint64_t max_pruning_depth{ 0 };
nano::rocksdb_config rocksdb_config;
nano::lmdb_config lmdb_config;
bool enable_upnp{ true };
public:
nano::vote_cache_config vote_cache;
nano::rep_crawler_config rep_crawler;
nano::block_processor_config block_processor;
@ -147,6 +151,7 @@ public:
nano::confirming_set_config confirming_set;
nano::monitor_config monitor;
nano::backlog_scan_config backlog_scan;
nano::bounded_backlog_config bounded_backlog;
public:
/** Entry is ignored if it cannot be parsed as a valid address:port */

View file

@ -792,6 +792,11 @@ void nano::ledger::initialize (nano::generate_cache_flags const & generate_cache
cache.pruned_count = store.pruned.count (transaction);
}
bool nano::ledger::unconfirmed_exists (secure::transaction const & transaction, nano::block_hash const & hash)
{
return any.block_exists (transaction, hash) && !confirmed.block_exists (transaction, hash);
}
nano::uint128_t nano::ledger::account_receivable (secure::transaction const & transaction_a, nano::account const & account_a, bool only_confirmed_a)
{
nano::uint128_t result (0);
@ -1537,6 +1542,13 @@ uint64_t nano::ledger::pruned_count () const
return cache.pruned_count;
}
uint64_t nano::ledger::backlog_count () const
{
auto blocks = cache.block_count.load ();
auto cemented = cache.cemented_count.load ();
return (blocks > cemented) ? blocks - cemented : 0;
}
nano::container_info nano::ledger::container_info () const
{
nano::container_info info;

View file

@ -43,6 +43,7 @@ public:
/** Start read-only transaction */
secure::read_transaction tx_begin_read () const;
bool unconfirmed_exists (secure::transaction const &, nano::block_hash const &);
nano::uint128_t account_receivable (secure::transaction const &, nano::account const &, bool = false);
/**
* Returns the cached vote weight for the given representative.
@ -83,6 +84,7 @@ public:
uint64_t block_count () const;
uint64_t account_count () const;
uint64_t pruned_count () const;
uint64_t backlog_count () const;
// Returned priority balance is maximum of block balance and previous block balance to handle full account balance send cases
// Returned timestamp is the previous block timestamp or the current timestamp if there's no previous block

View file

@ -123,11 +123,19 @@ std::optional<nano::amount> nano::ledger_set_any::block_balance (secure::transac
bool nano::ledger_set_any::block_exists (secure::transaction const & transaction, nano::block_hash const & hash) const
{
if (hash.is_zero ())
{
return false;
}
return ledger.store.block.exists (transaction, hash);
}
bool nano::ledger_set_any::block_exists_or_pruned (secure::transaction const & transaction, nano::block_hash const & hash) const
{
if (hash.is_zero ())
{
return false;
}
if (ledger.store.pruned.exists (transaction, hash))
{
return true;
@ -137,6 +145,10 @@ bool nano::ledger_set_any::block_exists_or_pruned (secure::transaction const & t
std::shared_ptr<nano::block> nano::ledger_set_any::block_get (secure::transaction const & transaction, nano::block_hash const & hash) const
{
if (hash.is_zero ())
{
return nullptr;
}
return ledger.store.block.get (transaction, hash);
}

View file

@ -45,10 +45,6 @@ uint64_t nano::ledger_set_confirmed::account_height (secure::transaction const &
std::optional<nano::amount> nano::ledger_set_confirmed::block_balance (secure::transaction const & transaction, nano::block_hash const & hash) const
{
if (hash.is_zero ())
{
return std::nullopt;
}
auto block = block_get (transaction, hash);
if (!block)
{
@ -64,6 +60,10 @@ bool nano::ledger_set_confirmed::block_exists (secure::transaction const & trans
bool nano::ledger_set_confirmed::block_exists_or_pruned (secure::transaction const & transaction, nano::block_hash const & hash) const
{
if (hash.is_zero ())
{
return false;
}
if (ledger.store.pruned.exists (transaction, hash))
{
return true;
@ -73,6 +73,10 @@ bool nano::ledger_set_confirmed::block_exists_or_pruned (secure::transaction con
std::shared_ptr<nano::block> nano::ledger_set_confirmed::block_get (secure::transaction const & transaction, nano::block_hash const & hash) const
{
if (hash.is_zero ())
{
return nullptr;
}
auto block = ledger.store.block.get (transaction, hash);
if (!block)
{

View file

@ -27,6 +27,9 @@ public:
// Conversion operator to const nano::store::transaction&
virtual operator const nano::store::transaction & () const = 0;
// Certain transactions may need to be refreshed if they are held for a long time
virtual bool refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }) = 0;
};
class write_transaction final : public transaction
@ -69,7 +72,7 @@ public:
renew ();
}
bool refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 })
bool refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }) override
{
auto now = std::chrono::steady_clock::now ();
if (now - start > max_age)
@ -119,9 +122,9 @@ public:
txn.refresh ();
}
void refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 })
bool refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }) override
{
txn.refresh_if_needed (max_age);
return txn.refresh_if_needed (max_age);
}
auto timestamp () const

View file

@ -83,13 +83,15 @@ void nano::store::read_transaction::refresh ()
renew ();
}
void nano::store::read_transaction::refresh_if_needed (std::chrono::milliseconds max_age)
bool nano::store::read_transaction::refresh_if_needed (std::chrono::milliseconds max_age)
{
auto now = std::chrono::steady_clock::now ();
if (now - start > max_age)
{
refresh ();
return true;
}
return false;
}
/*

View file

@ -66,7 +66,7 @@ public:
void reset ();
void renew ();
void refresh ();
void refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 });
bool refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 });
private:
std::unique_ptr<read_transaction_impl> impl;

View file

@ -17,6 +17,7 @@ enum class writer
confirmation_height,
pruning,
voting_final,
bounded_backlog,
testing // Used in tests to emulate a write lock
};