Election scheduler (#3208)

* This change replaces PoW-difficulty based prioritization with balance * time_since_use based prioritization. The scheduler currently manages two queues. A priority queue where live traffic is managed and scheduled according to vacancy in the active_transactions container and prioritized by the prioritization container. A manual queue where requests that have come through RPCs are enqueued and bypass prioritization.
This commit is contained in:
clemahieu 2021-04-25 15:24:52 +02:00 committed by GitHub
commit c6e52dab8f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 486 additions and 129 deletions

View file

@ -14,6 +14,7 @@ add_executable(
difficulty.cpp
distributed_work.cpp
election.cpp
election_scheduler.cpp
epochs.cpp
frontiers_confirmation.cpp
gap_cache.cpp

View file

@ -133,7 +133,7 @@ TEST (active_transactions, keep_local)
auto send5 (wallet.send_action (nano::dev_genesis_key.pub, key5.pub, node.config.receive_minimum.number ()));
auto send6 (wallet.send_action (nano::dev_genesis_key.pub, key6.pub, node.config.receive_minimum.number ()));
// should not drop wallet created transactions
ASSERT_TIMELY (5s, node.active.size () == 6);
ASSERT_TIMELY (5s, node.active.size () == 1);
for (auto const & block : { send1, send2, send3, send4, send5, send6 })
{
ASSERT_TIMELY (1s, node.active.election (block->qualified_root ()));
@ -175,8 +175,8 @@ TEST (active_transactions, keep_local)
node.process_active (open3);
node.block_processor.flush ();
// bound elections, should drop after one loop
ASSERT_TIMELY (5s, node.active.size () == node_config.active_elections_size);
ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::election_drop));
ASSERT_TIMELY (1s, node.active.size () == node_config.active_elections_size);
ASSERT_EQ (1, node.scheduler.size ());
}
TEST (active_transactions, inactive_votes_cache)
@ -342,7 +342,8 @@ TEST (active_transactions, inactive_votes_cache_multiple_votes)
node.vote_processor.vote (vote2, std::make_shared<nano::transport::channel_loopback> (node));
ASSERT_TIMELY (5s, node.active.find_inactive_votes_cache (send1->hash ()).voters.size () == 2);
ASSERT_EQ (1, node.active.inactive_votes_cache_size ());
node.active.insert (send1);
node.scheduler.activate (nano::dev_genesis_key.pub, node.store.tx_begin_read ());
node.scheduler.flush ();
auto election = node.active.election (send1->qualified_root ());
ASSERT_NE (nullptr, election);
ASSERT_EQ (3, election->votes ().size ()); // 2 votes and 1 default not_an_acount
@ -510,7 +511,7 @@ TEST (active_transactions, inactive_votes_cache_election_start)
ASSERT_TIMELY (5s, 13 == node.ledger.cache.cemented_count);
}
TEST (active_transactions, update_difficulty)
TEST (active_transactions, DISABLED_update_difficulty)
{
nano::system system (2);
auto & node1 = *system.nodes[0];
@ -557,6 +558,7 @@ TEST (active_transactions, update_difficulty)
node1.process_active (send1);
node1.process_active (send2);
node1.block_processor.flush ();
node1.scheduler.flush ();
// Share the updated blocks
node1.network.flood_block (send1);
node1.network.flood_block (send2);
@ -701,7 +703,8 @@ TEST (active_transactions, dropped_cleanup)
ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
node.active.insert (block);
node.block_confirm (block);
node.scheduler.flush ();
auto election = node.active.election (block->qualified_root ());
ASSERT_NE (nullptr, election);
@ -725,6 +728,7 @@ TEST (active_transactions, dropped_cleanup)
// Repeat test for a confirmed election
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
node.block_confirm (block);
node.scheduler.flush ();
election = node.active.election (block->qualified_root ());
ASSERT_NE (nullptr, election);
election->force_confirm ();
@ -848,6 +852,7 @@ TEST (active_transactions, fork_filter_cleanup)
.build_shared ();
node1.process_active (fork);
node1.block_processor.flush ();
node1.scheduler.flush ();
}
ASSERT_EQ (1, node1.active.size ());
@ -1028,7 +1033,7 @@ TEST (active_transactions, confirmation_consistency)
system.deadline_set (5s);
while (!node.ledger.block_confirmed (node.store.tx_begin_read (), block->hash ()))
{
node.active.insert (block);
node.scheduler.activate (nano::dev_genesis_key.pub, node.store.tx_begin_read ());
ASSERT_NO_ERROR (system.poll (5ms));
}
ASSERT_NO_ERROR (system.poll_until_true (1s, [&node, &block, i] {
@ -1128,24 +1133,31 @@ TEST (active_transactions, insertion_prioritization)
};
node.block_confirm (blocks[2]);
node.scheduler.flush ();
ASSERT_TRUE (node.active.election (blocks[2]->qualified_root ())->prioritized ());
update_active_multiplier ();
node.block_confirm (blocks[3]);
node.scheduler.flush ();
ASSERT_FALSE (node.active.election (blocks[3]->qualified_root ())->prioritized ());
update_active_multiplier ();
node.block_confirm (blocks[1]);
node.scheduler.flush ();
ASSERT_TRUE (node.active.election (blocks[1]->qualified_root ())->prioritized ());
update_active_multiplier ();
node.block_confirm (blocks[4]);
node.scheduler.flush ();
ASSERT_FALSE (node.active.election (blocks[4]->qualified_root ())->prioritized ());
update_active_multiplier ();
node.block_confirm (blocks[0]);
node.scheduler.flush ();
ASSERT_TRUE (node.active.election (blocks[0]->qualified_root ())->prioritized ());
update_active_multiplier ();
node.block_confirm (blocks[5]);
node.scheduler.flush ();
ASSERT_FALSE (node.active.election (blocks[5]->qualified_root ())->prioritized ());
update_active_multiplier ();
node.block_confirm (blocks[6]);
node.scheduler.flush ();
ASSERT_FALSE (node.active.election (blocks[6]->qualified_root ())->prioritized ());
ASSERT_EQ (4, node.stats.count (nano::stat::type::election, nano::stat::detail::election_non_priority));
@ -1262,6 +1274,7 @@ TEST (active_transactions, election_difficulty_update_old)
auto send1_copy = builder.make_block ().from (*send1).build_shared ();
node.process_active (send1);
node.block_processor.flush ();
node.scheduler.flush ();
ASSERT_EQ (1, node.active.size ());
auto multiplier = node.active.roots.begin ()->multiplier;
{
@ -1373,6 +1386,7 @@ TEST (active_transactions, election_difficulty_update_fork)
node.process_active (fork_change);
node.block_processor.flush ();
node.scheduler.flush ();
ASSERT_EQ (1, node.active.size ());
auto multiplier_change = node.active.roots.begin ()->multiplier;
node.process_active (fork_send);
@ -1421,6 +1435,7 @@ TEST (active_transactions, confirm_new)
.build_shared ();
node1.process_active (send);
node1.block_processor.flush ();
node1.scheduler.flush ();
ASSERT_EQ (1, node1.active.size ());
auto & node2 = *system.add_node ();
// Add key to node2
@ -1516,6 +1531,7 @@ TEST (active_transactions, conflicting_block_vote_existing_election)
auto vote_fork (std::make_shared<nano::vote> (nano::dev_genesis_key.pub, nano::dev_genesis_key.prv, std::numeric_limits<uint64_t>::max (), fork));
ASSERT_EQ (nano::process_result::progress, node.process_local (send).code);
node.scheduler.flush ();
ASSERT_EQ (1, node.active.size ());
// Vote for conflicting block, but the block does not yet exist in the ledger
@ -1591,18 +1607,19 @@ TEST (active_transactions, activate_account_chain)
ASSERT_EQ (nano::process_result::progress, node.process (*open).code);
ASSERT_EQ (nano::process_result::progress, node.process (*receive).code);
node.active.activate (nano::dev_genesis_key.pub);
node.scheduler.activate (nano::dev_genesis_key.pub, node.store.tx_begin_read ());
node.scheduler.flush ();
auto election1 = node.active.election (send->qualified_root ());
ASSERT_EQ (1, node.active.size ());
ASSERT_EQ (1, election1->blocks ().count (send->hash ()));
node.active.activate (nano::dev_genesis_key.pub);
node.scheduler.activate (nano::dev_genesis_key.pub, node.store.tx_begin_read ());
auto election2 = node.active.election (send->qualified_root ());
ASSERT_EQ (election2, election1);
election1->force_confirm ();
ASSERT_TIMELY (3s, node.block_confirmed (send->hash ()));
// On cementing, the next election is started
ASSERT_TIMELY (3s, node.active.active (send2->qualified_root ()));
node.active.activate (nano::dev_genesis_key.pub);
node.scheduler.activate (nano::dev_genesis_key.pub, node.store.tx_begin_read ());
auto election3 = node.active.election (send2->qualified_root ());
ASSERT_NE (nullptr, election3);
ASSERT_EQ (1, election3->blocks ().count (send2->hash ()));
@ -1611,11 +1628,11 @@ TEST (active_transactions, activate_account_chain)
// On cementing, the next election is started
ASSERT_TIMELY (3s, node.active.active (open->qualified_root ()));
ASSERT_TIMELY (3s, node.active.active (send3->qualified_root ()));
node.active.activate (nano::dev_genesis_key.pub);
node.scheduler.activate (nano::dev_genesis_key.pub, node.store.tx_begin_read ());
auto election4 = node.active.election (send3->qualified_root ());
ASSERT_NE (nullptr, election4);
ASSERT_EQ (1, election4->blocks ().count (send3->hash ()));
node.active.activate (key.pub);
node.scheduler.activate (key.pub, node.store.tx_begin_read ());
auto election5 = node.active.election (open->qualified_root ());
ASSERT_NE (nullptr, election5);
ASSERT_EQ (1, election5->blocks ().count (open->hash ()));
@ -1623,7 +1640,7 @@ TEST (active_transactions, activate_account_chain)
ASSERT_TIMELY (3s, node.block_confirmed (open->hash ()));
// Until send3 is also confirmed, the receive block should not activate
std::this_thread::sleep_for (200ms);
node.active.activate (key.pub);
node.scheduler.activate (key.pub, node.store.tx_begin_read ());
election4->force_confirm ();
ASSERT_TIMELY (3s, node.block_confirmed (send3->hash ()));
ASSERT_TIMELY (3s, node.active.active (receive->qualified_root ()));
@ -1926,7 +1943,7 @@ TEST (active_transactions, vacancy)
ASSERT_EQ (nano::process_result::progress, node.process (*send).code);
ASSERT_EQ (1, node.active.vacancy ());
ASSERT_EQ (0, node.active.size ());
node.active.activate (nano::dev_genesis_key.pub);
node.scheduler.activate (nano::dev_genesis_key.pub, node.store.tx_begin_read ());
ASSERT_TIMELY (1s, updated);
updated = false;
ASSERT_EQ (0, node.active.vacancy ());

View file

@ -1759,6 +1759,7 @@ TEST (bulk, offline_send)
ASSERT_NE (nullptr, send1);
ASSERT_NE (std::numeric_limits<nano::uint256_t>::max (), node1->balance (nano::dev_genesis_key.pub));
node1->block_processor.flush ();
node1->scheduler.flush ();
// Wait to finish election background tasks
ASSERT_TIMELY (10s, node1->active.empty ());
ASSERT_TRUE (node1->block_confirmed (send1->hash ()));

View file

@ -663,9 +663,11 @@ TEST (confirmation_height, conflict_rollback_cemented)
auto channel1 (node1->network.udp_channels.create (node1->network.endpoint ()));
node1->network.process_message (publish1, channel1);
node1->block_processor.flush ();
node1->scheduler.flush ();
auto channel2 (node2->network.udp_channels.create (node1->network.endpoint ()));
node2->network.process_message (publish2, channel2);
node2->block_processor.flush ();
node2->scheduler.flush ();
ASSERT_EQ (1, node1->active.size ());
ASSERT_EQ (1, node2->active.size ());
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);

View file

@ -18,7 +18,8 @@ TEST (conflicts, start_stop)
node1.work_generate_blocking (*send1);
ASSERT_EQ (nano::process_result::progress, node1.process (*send1).code);
ASSERT_EQ (0, node1.active.size ());
node1.active.activate (nano::dev_genesis_key.pub);
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
node1.scheduler.flush ();
auto election1 = node1.active.election (send1->qualified_root ());
ASSERT_EQ (1, node1.active.size ());
ASSERT_NE (nullptr, election1);
@ -34,11 +35,11 @@ TEST (conflicts, add_existing)
auto send1 (std::make_shared<nano::send_block> (genesis.hash (), key1.pub, 0, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, 0));
node1.work_generate_blocking (*send1);
ASSERT_EQ (nano::process_result::progress, node1.process (*send1).code);
node1.active.activate (nano::dev_genesis_key.pub);
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
nano::keypair key2;
auto send2 (std::make_shared<nano::send_block> (genesis.hash (), key2.pub, 0, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, 0));
send2->sideband_set ({});
node1.active.activate (nano::dev_genesis_key.pub);
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
auto election1 = node1.active.election (send2->qualified_root ());
ASSERT_EQ (1, node1.active.size ());
auto vote1 (std::make_shared<nano::vote> (key2.pub, key2.prv, 0, send2));
@ -64,7 +65,8 @@ TEST (conflicts, add_two)
auto send2 (std::make_shared<nano::send_block> (send1->hash (), key2.pub, 0, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, 0));
node1.work_generate_blocking (*send2);
ASSERT_EQ (nano::process_result::progress, node1.process (*send2).code);
node1.active.activate (nano::dev_genesis_key.pub);
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
node1.scheduler.flush ();
ASSERT_EQ (2, node1.active.size ());
}
@ -168,6 +170,7 @@ TEST (conflicts, reprioritize)
nano::send_block send1_copy (*send1);
node1.process_active (send1);
node1.block_processor.flush ();
node1.scheduler.flush ();
{
nano::lock_guard<nano::mutex> guard (node1.active.mutex);
auto existing1 (node1.active.roots.find (send1->qualified_root ()));

View file

@ -13,6 +13,7 @@ TEST (election, construction)
auto & node = *system.nodes[0];
genesis.open->sideband_set (nano::block_sideband (nano::genesis_account, 0, nano::genesis_amount, 1, nano::seconds_since_epoch (), nano::epoch::epoch_0, false, false, false, nano::epoch::epoch_0));
node.block_confirm (genesis.open);
node.scheduler.flush ();
auto election = node.active.election (genesis.open->qualified_root ());
election->transition_active ();
}
@ -49,8 +50,10 @@ TEST (election, quorum_minimum_flip_success)
node1.work_generate_blocking (*send2);
node1.process_active (send1);
node1.block_processor.flush ();
node1.scheduler.flush ();
node1.process_active (send2);
node1.block_processor.flush ();
node1.scheduler.flush ();
auto election = node1.active.election (send1->qualified_root ());
ASSERT_NE (nullptr, election);
ASSERT_EQ (2, election->blocks ().size ());
@ -93,8 +96,10 @@ TEST (election, quorum_minimum_flip_fail)
node1.work_generate_blocking (*send2);
node1.process_active (send1);
node1.block_processor.flush ();
node1.scheduler.flush ();
node1.process_active (send2);
node1.block_processor.flush ();
node1.scheduler.flush ();
auto election = node1.active.election (send1->qualified_root ());
ASSERT_NE (nullptr, election);
ASSERT_EQ (2, election->blocks ().size ());
@ -126,7 +131,7 @@ TEST (election, quorum_minimum_confirm_success)
node1.work_generate_blocking (*send1);
node1.process_active (send1);
node1.block_processor.flush ();
node1.active.activate (nano::dev_genesis_key.pub);
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
auto election = node1.active.election (send1->qualified_root ());
ASSERT_NE (nullptr, election);
ASSERT_EQ (1, election->blocks ().size ());
@ -158,7 +163,7 @@ TEST (election, quorum_minimum_confirm_fail)
node1.work_generate_blocking (*send1);
node1.process_active (send1);
node1.block_processor.flush ();
node1.active.activate (nano::dev_genesis_key.pub);
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
auto election = node1.active.election (send1->qualified_root ());
ASSERT_NE (nullptr, election);
ASSERT_EQ (1, election->blocks ().size ());
@ -228,7 +233,7 @@ TEST (election, quorum_minimum_update_weight_before_quorum_checks)
node2.block_processor.flush ();
ASSERT_EQ (node2.ledger.cache.block_count, 4);
node1.active.activate (nano::dev_genesis_key.pub);
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
auto election = node1.active.election (send1->qualified_root ());
ASSERT_NE (nullptr, election);
ASSERT_EQ (1, election->blocks ().size ());

View file

@ -0,0 +1,120 @@
#include <nano/node/election_scheduler.hpp>
#include <nano/node/testing.hpp>
#include <nano/test_common/testutil.hpp>
#include <gtest/gtest.h>
#include <chrono>
using namespace std::chrono_literals;
TEST (election_scheduler, construction)
{
nano::system system{ 1 };
}
TEST (election_scheduler, activate_one_timely)
{
nano::system system{ 1 };
nano::state_block_builder builder;
auto send1 = builder.make_block ()
.account (nano::dev_genesis_key.pub)
.previous (nano::genesis_hash)
.representative (nano::dev_genesis_key.pub)
.balance (nano::genesis_amount - nano::Gxrb_ratio)
.link (nano::dev_genesis_key.pub)
.sign (nano::dev_genesis_key.prv, nano::dev_genesis_key.pub)
.work (*system.work.generate (nano::genesis_hash))
.build_shared ();
system.nodes[0]->ledger.process (system.nodes[0]->store.tx_begin_write (), *send1);
system.nodes[0]->scheduler.activate (nano::dev_genesis_key.pub, system.nodes[0]->store.tx_begin_read ());
ASSERT_TIMELY (1s, system.nodes[0]->active.election (send1->qualified_root ()));
}
TEST (election_scheduler, activate_one_flush)
{
nano::system system{ 1 };
nano::state_block_builder builder;
auto send1 = builder.make_block ()
.account (nano::dev_genesis_key.pub)
.previous (nano::genesis_hash)
.representative (nano::dev_genesis_key.pub)
.balance (nano::genesis_amount - nano::Gxrb_ratio)
.link (nano::dev_genesis_key.pub)
.sign (nano::dev_genesis_key.prv, nano::dev_genesis_key.pub)
.work (*system.work.generate (nano::genesis_hash))
.build_shared ();
system.nodes[0]->ledger.process (system.nodes[0]->store.tx_begin_write (), *send1);
system.nodes[0]->scheduler.activate (nano::dev_genesis_key.pub, system.nodes[0]->store.tx_begin_read ());
system.nodes[0]->scheduler.flush ();
ASSERT_NE (nullptr, system.nodes[0]->active.election (send1->qualified_root ()));
}
TEST (election_scheduler, no_vacancy)
{
nano::system system;
nano::node_config config{ nano::get_available_port (), system.logging };
config.active_elections_size = 1;
auto & node = *system.add_node (config);
nano::state_block_builder builder;
nano::keypair key;
// Activating accounts depends on confirmed dependencies. First, prepare 2 accounts
auto send = builder.make_block ()
.account (nano::dev_genesis_key.pub)
.previous (nano::genesis_hash)
.representative (nano::dev_genesis_key.pub)
.link (key.pub)
.balance (nano::genesis_amount - nano::Gxrb_ratio)
.sign (nano::dev_genesis_key.prv, nano::dev_genesis_key.pub)
.work (*system.work.generate (nano::genesis_hash))
.build_shared ();
auto receive = builder.make_block ()
.account (key.pub)
.previous (0)
.representative (key.pub)
.link (send->hash ())
.balance (nano::Gxrb_ratio)
.sign (key.prv, key.pub)
.work (*system.work.generate (key.pub))
.build_shared ();
ASSERT_EQ (nano::process_result::progress, node.process (*send).code);
ASSERT_EQ (nano::process_result::progress, node.process (*receive).code);
nano::blocks_confirm (node, { send, receive }, true);
// Second, process two eligble transactions
auto block0 = builder.make_block ()
.account (nano::dev_genesis_key.pub)
.previous (send->hash ())
.representative (nano::dev_genesis_key.pub)
.link (nano::dev_genesis_key.pub)
.balance (nano::genesis_amount - 2 * nano::Gxrb_ratio)
.sign (nano::dev_genesis_key.prv, nano::dev_genesis_key.pub)
.work (*system.work.generate (send->hash ()))
.build_shared ();
auto block1 = builder.make_block ()
.account (key.pub)
.previous (receive->hash ())
.representative (key.pub)
.link (key.pub)
.balance (0)
.sign (key.prv, key.pub)
.work (*system.work.generate (receive->hash ()))
.build_shared ();
ASSERT_EQ (nano::process_result::progress, node.process (*block0).code);
ASSERT_EQ (nano::process_result::progress, node.process (*block1).code);
node.scheduler.activate (nano::dev_genesis_key.pub, node.store.tx_begin_read ());
// There is vacancy so it should be inserted
ASSERT_TIMELY (1s, node.active.size () == 1);
node.scheduler.activate (key.pub, node.store.tx_begin_read ());
// There is no vacancy so it should stay queued
ASSERT_TIMELY (1s, node.scheduler.size () == 1);
auto election3 = node.active.election (block0->qualified_root ());
ASSERT_NE (nullptr, election3);
election3->force_confirm ();
// Election completed, next in queue should begin
ASSERT_TIMELY (1s, node.scheduler.size () == 0);
ASSERT_TIMELY (1s, node.active.size () == 1);
auto election4 = node.active.election (block1->qualified_root ());
ASSERT_NE (nullptr, election4);
}

View file

@ -767,7 +767,8 @@ TEST (votes, check_signature)
auto transaction (node1.store.tx_begin_write ());
ASSERT_EQ (nano::process_result::progress, node1.ledger.process (transaction, *send1).code);
}
node1.active.activate (nano::dev_genesis_key.pub);
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
node1.scheduler.flush ();
auto election1 = node1.active.election (send1->qualified_root ());
ASSERT_EQ (1, election1->votes ().size ());
auto vote1 (std::make_shared<nano::vote> (nano::dev_genesis_key.pub, nano::dev_genesis_key.prv, 1, send1));
@ -789,6 +790,7 @@ TEST (votes, add_one)
auto transaction (node1.store.tx_begin_write ());
ASSERT_EQ (nano::process_result::progress, node1.ledger.process (transaction, *send1).code);
node1.block_confirm (send1);
node1.scheduler.flush ();
auto election1 = node1.active.election (send1->qualified_root ());
ASSERT_EQ (1, election1->votes ().size ());
auto vote1 (std::make_shared<nano::vote> (nano::dev_genesis_key.pub, nano::dev_genesis_key.prv, 1, send1));
@ -808,24 +810,25 @@ TEST (votes, add_one)
TEST (votes, add_two)
{
nano::system system (1);
auto & node1 (*system.nodes[0]);
nano::system system{ 1 };
auto & node1 = *system.nodes[0];
nano::genesis genesis;
nano::keypair key1;
auto send1 (std::make_shared<nano::send_block> (genesis.hash (), key1.pub, nano::genesis_amount - 100, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, 0));
auto send1 = std::make_shared<nano::send_block> (genesis.hash (), key1.pub, nano::genesis_amount - 100, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, 0);
node1.work_generate_blocking (*send1);
auto transaction (node1.store.tx_begin_write ());
auto transaction = node1.store.tx_begin_write ();
ASSERT_EQ (nano::process_result::progress, node1.ledger.process (transaction, *send1).code);
node1.block_confirm (send1);
node1.scheduler.flush ();
auto election1 = node1.active.election (send1->qualified_root ());
nano::keypair key2;
auto send2 (std::make_shared<nano::send_block> (genesis.hash (), key2.pub, 0, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, 0));
auto vote2 (std::make_shared<nano::vote> (key2.pub, key2.prv, 1, send2));
auto send2 = std::make_shared<nano::send_block> (genesis.hash (), key2.pub, 0, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, 0);
auto vote2 = std::make_shared<nano::vote> (key2.pub, key2.prv, 1, send2);
ASSERT_EQ (nano::vote_code::vote, node1.active.vote (vote2));
auto vote1 (std::make_shared<nano::vote> (nano::dev_genesis_key.pub, nano::dev_genesis_key.prv, 1, send1));
auto vote1 = std::make_shared<nano::vote> (nano::dev_genesis_key.pub, nano::dev_genesis_key.prv, 1, send1);
ASSERT_EQ (nano::vote_code::vote, node1.active.vote (vote1));
ASSERT_EQ (3, election1->votes ().size ());
auto votes1 (election1->votes ());
auto votes1 = election1->votes ();
ASSERT_NE (votes1.end (), votes1.find (nano::dev_genesis_key.pub));
ASSERT_EQ (send1->hash (), votes1[nano::dev_genesis_key.pub].hash);
ASSERT_NE (votes1.end (), votes1.find (key2.pub));
@ -856,7 +859,8 @@ TEST (votes, add_existing)
.build ();
node1.work_generate_blocking (*send1);
ASSERT_EQ (nano::process_result::progress, node1.ledger.process (node1.store.tx_begin_write (), *send1).code);
node1.active.activate (nano::dev_genesis_key.pub);
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
node1.scheduler.flush ();
auto election1 = node1.active.election (send1->qualified_root ());
auto vote1 (std::make_shared<nano::vote> (nano::dev_genesis_key.pub, nano::dev_genesis_key.prv, 1, send1));
ASSERT_EQ (nano::vote_code::vote, node1.active.vote (vote1));
@ -907,6 +911,7 @@ TEST (votes, add_old)
auto transaction (node1.store.tx_begin_write ());
ASSERT_EQ (nano::process_result::progress, node1.ledger.process (transaction, *send1).code);
node1.block_confirm (send1);
node1.scheduler.flush ();
auto election1 = node1.active.election (send1->qualified_root ());
auto vote1 (std::make_shared<nano::vote> (nano::dev_genesis_key.pub, nano::dev_genesis_key.prv, 2, send1));
auto channel (std::make_shared<nano::transport::channel_loopback> (node1));
@ -981,6 +986,7 @@ TEST (votes, add_cooldown)
auto transaction (node1.store.tx_begin_write ());
ASSERT_EQ (nano::process_result::progress, node1.ledger.process (transaction, *send1).code);
node1.block_confirm (send1);
node1.scheduler.flush ();
auto election1 = node1.active.election (send1->qualified_root ());
auto vote1 (std::make_shared<nano::vote> (nano::dev_genesis_key.pub, nano::dev_genesis_key.prv, 1, send1));
auto channel (std::make_shared<nano::transport::channel_loopback> (node1));

View file

@ -332,7 +332,7 @@ TEST (receivable_processor, confirm_insufficient_pos)
auto block1 (std::make_shared<nano::send_block> (genesis.hash (), 0, 0, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, 0));
node1.work_generate_blocking (*block1);
ASSERT_EQ (nano::process_result::progress, node1.process (*block1).code);
node1.active.activate (nano::dev_genesis_key.pub);
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
nano::keypair key1;
auto vote (std::make_shared<nano::vote> (key1.pub, key1.prv, 0, block1));
nano::confirm_ack con1 (vote);
@ -347,7 +347,7 @@ TEST (receivable_processor, confirm_sufficient_pos)
auto block1 (std::make_shared<nano::send_block> (genesis.hash (), 0, 0, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, 0));
node1.work_generate_blocking (*block1);
ASSERT_EQ (nano::process_result::progress, node1.process (*block1).code);
node1.active.activate (nano::dev_genesis_key.pub);
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
auto vote (std::make_shared<nano::vote> (nano::dev_genesis_key.pub, nano::dev_genesis_key.prv, 0, block1));
nano::confirm_ack con1 (vote);
node1.network.process_message (con1, node1.network.udp_channels.create (node1.network.endpoint ()));

View file

@ -1079,6 +1079,7 @@ TEST (node, fork_publish)
node1.work_generate_blocking (*send2);
node1.process_active (send1);
node1.block_processor.flush ();
node1.scheduler.flush ();
ASSERT_EQ (1, node1.active.size ());
auto election (node1.active.election (send1->qualified_root ()));
ASSERT_NE (nullptr, election);
@ -1159,8 +1160,10 @@ TEST (node, fork_keep)
.build_shared ();
node1.process_active (send1);
node1.block_processor.flush ();
node1.scheduler.flush ();
node2.process_active (send1);
node2.block_processor.flush ();
node2.scheduler.flush ();
ASSERT_EQ (1, node1.active.size ());
ASSERT_EQ (1, node2.active.size ());
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
@ -1214,9 +1217,11 @@ TEST (node, fork_flip)
auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.process_message (publish1, channel1);
node1.block_processor.flush ();
node1.scheduler.flush ();
auto channel2 (node2.network.udp_channels.create (node1.network.endpoint ()));
node2.network.process_message (publish2, channel2);
node2.block_processor.flush ();
node2.scheduler.flush ();
ASSERT_EQ (1, node1.active.size ());
ASSERT_EQ (1, node2.active.size ());
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
@ -1289,7 +1294,9 @@ TEST (node, fork_multi_flip)
node2.network.process_message (publish2, node2.network.udp_channels.create (node2.network.endpoint ()));
node2.network.process_message (publish3, node2.network.udp_channels.create (node2.network.endpoint ()));
node1.block_processor.flush ();
node1.scheduler.flush ();
node2.block_processor.flush ();
node2.scheduler.flush ();
ASSERT_EQ (1, node1.active.size ());
ASSERT_EQ (1, node2.active.size ());
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
@ -1380,6 +1387,7 @@ TEST (node, fork_open)
auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.process_message (publish1, channel1);
node1.block_processor.flush ();
node1.scheduler.flush ();
auto election = node1.active.election (publish1.block->qualified_root ());
ASSERT_NE (nullptr, election);
election->force_confirm ();
@ -1395,6 +1403,7 @@ TEST (node, fork_open)
nano::publish publish2 (open1);
node1.network.process_message (publish2, channel1);
node1.block_processor.flush ();
node1.scheduler.flush ();
ASSERT_EQ (1, node1.active.size ());
auto open2 = builder.make_block ()
.source (publish1.block->hash ())
@ -1407,6 +1416,7 @@ TEST (node, fork_open)
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
node1.network.process_message (publish3, channel1);
node1.block_processor.flush ();
node1.scheduler.flush ();
election = node1.active.election (publish3.block->qualified_root ());
ASSERT_EQ (2, election->blocks ().size ());
ASSERT_EQ (publish2.block->hash (), election->winner ()->hash ());
@ -1867,7 +1877,9 @@ TEST (node, rep_self_vote)
.build_shared ();
ASSERT_EQ (nano::process_result::progress, node0->process (*block0).code);
auto & active = node0->active;
active.activate (nano::dev_genesis_key.pub);
auto & scheduler = node0->scheduler;
scheduler.activate (nano::dev_genesis_key.pub, node0->store.tx_begin_read ());
scheduler.flush ();
auto election1 = active.election (block0->qualified_root ());
ASSERT_NE (nullptr, election1);
// Wait until representatives are activated & make vote
@ -2528,6 +2540,7 @@ TEST (node, online_reps_election)
.build_shared ();
node1.process_active (send1);
node1.block_processor.flush ();
node1.scheduler.flush ();
ASSERT_EQ (1, node1.active.size ());
// Process vote for ongoing election
auto vote = std::make_shared<nano::vote> (nano::dev_genesis_key.pub, nano::dev_genesis_key.prv, nano::milliseconds_since_epoch (), std::vector<nano::block_hash>{ send1->hash () });
@ -2631,11 +2644,11 @@ TEST (node, block_arrival_time)
TEST (node, confirm_quorum)
{
nano::system system (1);
auto & node1 (*system.nodes[0]);
auto & node1 = *system.nodes[0];
nano::genesis genesis;
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
// Put greater than node.delta () in pending so quorum can't be reached
nano::amount new_balance (node1.online_reps.delta () - nano::Gxrb_ratio);
nano::amount new_balance = node1.online_reps.delta () - nano::Gxrb_ratio;
auto send1 = nano::state_block_builder ()
.account (nano::dev_genesis_key.pub)
.previous (genesis.hash ())
@ -2647,8 +2660,8 @@ TEST (node, confirm_quorum)
.build_shared ();
ASSERT_EQ (nano::process_result::progress, node1.process (*send1).code);
system.wallet (0)->send_action (nano::dev_genesis_key.pub, nano::dev_genesis_key.pub, new_balance.number ());
ASSERT_TIMELY (10s, !node1.active.empty ());
auto election (node1.active.election (nano::qualified_root (send1->hash (), send1->hash ())));
ASSERT_TIMELY (1s, node1.active.election (send1->qualified_root ()));
auto election = node1.active.election (send1->qualified_root ());
ASSERT_NE (nullptr, election);
ASSERT_FALSE (election->confirmed ());
ASSERT_EQ (1, election->votes ().size ());
@ -4023,8 +4036,10 @@ TEST (node, rollback_vote_self)
ASSERT_EQ (weight, node.weight (key.pub));
node.process_active (send2);
node.block_processor.flush ();
node.scheduler.flush ();
node.process_active (fork);
node.block_processor.flush ();
node.scheduler.flush ();
election = node.active.election (send2->qualified_root ());
ASSERT_NE (nullptr, election);
ASSERT_EQ (2, election->blocks ().size ());
@ -4478,6 +4493,7 @@ TEST (node, deferred_dependent_elections)
.build_shared ();
node.process_local (send1);
node.block_processor.flush ();
node.scheduler.flush ();
auto election_send1 = node.active.election (send1->qualified_root ());
ASSERT_NE (nullptr, election_send1);
@ -4636,6 +4652,7 @@ TEST (node, pruning_automatic)
node1.process_active (send1);
node1.process_active (send2);
node1.block_processor.flush ();
node1.scheduler.flush ();
// Confirm last block to prune previous
{
auto election = node1.active.election (send1->qualified_root ());
@ -4689,6 +4706,7 @@ TEST (node, pruning_age)
node1.process_active (send1);
node1.process_active (send2);
node1.block_processor.flush ();
node1.scheduler.flush ();
// Confirm last block to prune previous
{
auto election = node1.active.election (send1->qualified_root ());
@ -4721,7 +4739,7 @@ TEST (node, pruning_age)
TEST (node, pruning_depth)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
nano::node_config node_config{ nano::get_available_port (), system.logging };
node_config.enable_voting = false; // Remove after allowing pruned voting
nano::node_flags node_flags;
node_flags.enable_pruning = true;
@ -4746,6 +4764,7 @@ TEST (node, pruning_depth)
node1.process_active (send1);
node1.process_active (send2);
node1.block_processor.flush ();
node1.scheduler.flush ();
// Confirm last block to prune previous
auto election1 = node1.active.election (send1->qualified_root ());
ASSERT_NE (nullptr, election1);

View file

@ -69,14 +69,14 @@ TEST (vote_processor, flush)
TEST (vote_processor, invalid_signature)
{
nano::system system (1);
auto & node (*system.nodes[0]);
nano::system system{ 1 };
auto & node = *system.nodes[0];
nano::genesis genesis;
nano::keypair key;
auto vote (std::make_shared<nano::vote> (key.pub, key.prv, 1, std::vector<nano::block_hash>{ genesis.open->hash () }));
auto vote = std::make_shared<nano::vote> (key.pub, key.prv, 1, std::vector<nano::block_hash>{ genesis.open->hash () });
auto vote_invalid = std::make_shared<nano::vote> (*vote);
vote_invalid->signature.bytes[0] ^= 1;
auto channel (std::make_shared<nano::transport::channel_loopback> (node));
auto channel = std::make_shared<nano::transport::channel_loopback> (node);
genesis.open->sideband_set (nano::block_sideband (nano::genesis_account, 0, nano::genesis_amount, 1, nano::seconds_since_epoch (), nano::epoch::epoch_0, false, false, false, nano::epoch::epoch_0));
node.block_confirm (genesis.open);

View file

@ -1244,14 +1244,14 @@ TEST (wallet, receive_pruned)
nano::system system;
nano::node_flags node_flags;
node_flags.disable_request_loop = true;
auto & node1 (*system.add_node (node_flags));
auto & node1 = *system.add_node (node_flags);
node_flags.enable_pruning = true;
nano::node_config config (nano::get_available_port (), system.logging);
config.enable_voting = false; // Remove after allowing pruned voting
auto & node2 (*system.add_node (config, node_flags));
auto & node2 = *system.add_node (config, node_flags);
auto & wallet1 (*system.wallet (0));
auto & wallet2 (*system.wallet (1));
auto & wallet1 = *system.wallet (0);
auto & wallet2 = *system.wallet (1);
nano::keypair key;
nano::state_block_builder builder;
@ -1265,7 +1265,7 @@ TEST (wallet, receive_pruned)
// Pruning
ASSERT_TIMELY (5s, node2.ledger.cache.cemented_count == 3);
{
auto transaction (node2.store.tx_begin_write ());
auto transaction = node2.store.tx_begin_write ();
ASSERT_EQ (1, node2.ledger.pruning_action (transaction, send1->hash (), 2));
}
ASSERT_EQ (1, node2.ledger.cache.pruned_count);

View file

@ -5,7 +5,7 @@
using namespace std::chrono_literals;
TEST (work_watcher, update)
TEST (work_watcher, DISABLED_update)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
@ -15,15 +15,17 @@ TEST (work_watcher, update)
nano::node_flags node_flags;
node_flags.disable_request_loop = true;
auto & node = *system.add_node (node_config, node_flags);
auto & wallet (*system.wallet (0));
auto & wallet = *system.wallet (0);
wallet.insert_adhoc (nano::dev_genesis_key.prv);
nano::keypair key;
auto const block1 (wallet.send_action (nano::dev_genesis_key.pub, key.pub, 100));
auto difficulty1 (block1->difficulty ());
auto multiplier1 (nano::normalized_multiplier (nano::difficulty::to_multiplier (difficulty1, nano::work_threshold (block1->work_version (), nano::block_details (nano::epoch::epoch_0, true, false, false))), node.network_params.network.publish_thresholds.epoch_1));
auto const block2 (wallet.send_action (nano::dev_genesis_key.pub, key.pub, 200));
auto difficulty2 (block2->difficulty ());
auto multiplier2 (nano::normalized_multiplier (nano::difficulty::to_multiplier (difficulty2, nano::work_threshold (block2->work_version (), nano::block_details (nano::epoch::epoch_0, true, false, false))), node.network_params.network.publish_thresholds.epoch_1));
auto const block1 = wallet.send_action (nano::dev_genesis_key.pub, key.pub, 100);
auto difficulty1 = block1->difficulty ();
auto multiplier1 = nano::normalized_multiplier (nano::difficulty::to_multiplier (difficulty1, nano::work_threshold (block1->work_version (), nano::block_details (nano::epoch::epoch_0, true, false, false))), node.network_params.network.publish_thresholds.epoch_1);
auto const block2 = wallet.send_action (nano::dev_genesis_key.pub, key.pub, 200);
auto difficulty2 = block2->difficulty ();
auto multiplier2 = nano::normalized_multiplier (nano::difficulty::to_multiplier (difficulty2, nano::work_threshold (block2->work_version (), nano::block_details (nano::epoch::epoch_0, true, false, false))), node.network_params.network.publish_thresholds.epoch_1);
node.block_processor.flush ();
node.scheduler.flush ();
double updated_multiplier1{ multiplier1 }, updated_multiplier2{ multiplier2 }, target_multiplier{ std::max (multiplier1, multiplier2) + 1e-6 };
{
nano::lock_guard<nano::mutex> guard (node.active.mutex);
@ -35,13 +37,13 @@ TEST (work_watcher, update)
{
nano::lock_guard<nano::mutex> guard (node.active.mutex);
{
auto const existing (node.active.roots.find (block1->qualified_root ()));
auto const existing = node.active.roots.find (block1->qualified_root ());
//if existing is junk the block has been confirmed already
ASSERT_NE (existing, node.active.roots.end ());
updated_multiplier1 = existing->multiplier;
}
{
auto const existing (node.active.roots.find (block2->qualified_root ()));
auto const existing = node.active.roots.find (block2->qualified_root ());
//if existing is junk the block has been confirmed already
ASSERT_NE (existing, node.active.roots.end ());
updated_multiplier2 = existing->multiplier;

View file

@ -87,6 +87,8 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::db_parallel_traversal:
thread_role_name_string = "DB par traversl";
break;
case nano::thread_role::name::election_scheduler:
thread_role_name_string = "Election Sched";
}
/*

View file

@ -38,7 +38,8 @@ namespace thread_role
request_aggregator,
state_block_signature_verification,
epoch_upgrader,
db_parallel_traversal
db_parallel_traversal,
election_scheduler
};
/*
* Get/Set the identifier for the current thread

View file

@ -56,6 +56,8 @@ add_library(
distributed_work_factory.cpp
election.hpp
election.cpp
election_scheduler.hpp
election_scheduler.cpp
gap_cache.hpp
gap_cache.cpp
ipc/action_handler.hpp

View file

@ -19,14 +19,15 @@ size_t constexpr nano::active_transactions::max_active_elections_frontier_insert
constexpr std::chrono::minutes nano::active_transactions::expired_optimistic_election_info_cutoff;
nano::active_transactions::active_transactions (nano::node & node_a, nano::confirmation_height_processor & confirmation_height_processor_a) :
confirmation_height_processor (confirmation_height_processor_a),
node (node_a),
multipliers_cb (20, 1.),
trended_active_multiplier (1.0),
generator (node_a.config, node_a.ledger, node_a.wallets, node_a.vote_processor, node_a.history, node_a.network, node_a.stats),
check_all_elections_period (node_a.network_params.network.is_dev_network () ? 10ms : 5s),
election_time_to_live (node_a.network_params.network.is_dev_network () ? 0s : 2s),
prioritized_cutoff (std::max<size_t> (1, node_a.config.active_elections_size / 10)),
scheduler{ node_a.scheduler }, // Move dependencies requiring this circular reference
confirmation_height_processor{ confirmation_height_processor_a },
node{ node_a },
multipliers_cb{ 20, 1. },
trended_active_multiplier{ 1.0 },
generator{ node_a.config, node_a.ledger, node_a.wallets, node_a.vote_processor, node_a.history, node_a.network, node_a.stats },
check_all_elections_period{ node_a.network_params.network.is_dev_network () ? 10ms : 5s },
election_time_to_live{ node_a.network_params.network.is_dev_network () ? 0s : 2s },
prioritized_cutoff{ std::max<size_t> (1, node_a.config.active_elections_size / 10) },
thread ([this] () {
nano::thread_role::set (nano::thread_role::name::request_loop);
request_loop ();
@ -246,13 +247,13 @@ void nano::active_transactions::block_cemented_callback (std::shared_ptr<nano::b
if (cemented_bootstrap_count_reached && was_active && low_active_elections ())
{
// Start or vote for the next unconfirmed block
activate (account);
scheduler.activate (account, transaction);
// Start or vote for the next unconfirmed block in the destination account
auto const & destination (node.ledger.block_destination (transaction, *block_a));
if (!destination.is_zero () && destination != account)
{
activate (destination);
scheduler.activate (destination, transaction);
}
}
}
@ -590,11 +591,6 @@ void nano::active_transactions::request_loop ()
const auto stamp_l = std::chrono::steady_clock::now ();
// frontiers_confirmation should be above update_active_multiplier to ensure new sorted roots are updated
if (should_do_frontiers_confirmation ())
{
frontiers_confirmation (lock);
}
update_active_multiplier (lock);
request_confirm (lock);
@ -867,12 +863,6 @@ nano::election_insertion_result nano::active_transactions::insert_impl (nano::un
return result;
}
nano::election_insertion_result nano::active_transactions::insert (std::shared_ptr<nano::block> const & block_a, boost::optional<nano::uint128_t> const & previous_balance_a, nano::election_behavior election_behavior_a, std::function<void (std::shared_ptr<nano::block> const &)> const & confirmation_action_a)
{
nano::unique_lock<nano::mutex> lock (mutex);
return insert_impl (lock, block_a, previous_balance_a, election_behavior_a, confirmation_action_a);
}
// Validate a vote and apply it to the current election if one exists
nano::vote_code nano::active_transactions::vote (std::shared_ptr<nano::vote> const & vote_a)
{
@ -989,34 +979,6 @@ std::shared_ptr<nano::block> nano::active_transactions::winner (nano::block_hash
return result;
}
nano::election_insertion_result nano::active_transactions::activate (nano::account const & account_a)
{
nano::election_insertion_result result;
auto transaction (node.store.tx_begin_read ());
nano::account_info account_info;
if (!node.store.account_get (transaction, account_a, account_info))
{
nano::confirmation_height_info conf_info;
node.store.confirmation_height_get (transaction, account_a, conf_info);
if (conf_info.height < account_info.block_count)
{
debug_assert (conf_info.frontier != account_info.head);
auto hash = conf_info.height == 0 ? account_info.open_block : node.store.block_successor (transaction, conf_info.frontier);
auto block = node.store.block_get (transaction, hash);
release_assert (block != nullptr);
if (node.ledger.dependents_confirmed (transaction, *block))
{
result = insert (block);
if (result.inserted)
{
result.election->transition_active ();
}
}
}
}
return result;
}
bool nano::active_transactions::update_difficulty (std::shared_ptr<nano::block> const & block_a, bool flood_update)
{
nano::unique_lock<nano::mutex> lock (mutex);
@ -1072,7 +1034,7 @@ void nano::active_transactions::restart (nano::transaction const & transaction_a
auto previous_balance = node.ledger.balance (transaction_a, ledger_block->previous ());
auto block_has_account = ledger_block->type () == nano::block_type::state || ledger_block->type () == nano::block_type::open;
auto account = block_has_account ? ledger_block->account () : ledger_block->sideband ().account;
activate (account);
scheduler.activate (account, transaction_a);
}
}
}

View file

@ -29,6 +29,7 @@ class node;
class block;
class block_sideband;
class election;
class election_scheduler;
class vote;
class transaction;
class confirmation_height_processor;
@ -155,11 +156,6 @@ public:
explicit active_transactions (nano::node &, nano::confirmation_height_processor &);
~active_transactions ();
// Start an election for a block
// Call action with confirmed block, may be different than what we started with
// clang-format off
nano::election_insertion_result insert (std::shared_ptr<nano::block> const &, boost::optional<nano::uint128_t> const & = boost::none, nano::election_behavior = nano::election_behavior::normal, std::function<void(std::shared_ptr<nano::block> const&)> const & = nullptr);
// clang-format on
// Distinguishes replay votes, cannot be determined if the block is not in any election
nano::vote_code vote (std::shared_ptr<nano::vote> const &);
// Is the root of this block in the roots container
@ -167,8 +163,6 @@ public:
bool active (nano::qualified_root const &);
std::shared_ptr<nano::election> election (nano::qualified_root const &) const;
std::shared_ptr<nano::block> winner (nano::block_hash const &) const;
// Activates the first unconfirmed block of \p account_a
nano::election_insertion_result activate (nano::account const &);
// Returns false if the election difficulty was updated
bool update_difficulty (std::shared_ptr<nano::block> const &, bool);
// Returns false if the election was restarted
@ -207,6 +201,7 @@ public:
void trigger_inactive_votes_cache_election (std::shared_ptr<nano::block> const &);
nano::inactive_cache_information find_inactive_votes_cache (nano::block_hash const &);
void erase_inactive_votes_cache (nano::block_hash const &);
nano::election_scheduler & scheduler;
nano::confirmation_height_processor & confirmation_height_processor;
nano::node & node;
mutable nano::mutex mutex{ mutex_identifier (mutexes::active) };
@ -333,6 +328,7 @@ private:
boost::thread thread;
friend class election;
friend class election_scheduler;
friend std::unique_ptr<container_info_component> collect_container_info (active_transactions &, const std::string &);
friend class active_transactions_vote_replays_Test;

View file

@ -356,7 +356,8 @@ void nano::block_processor::process_live (nano::transaction const & transaction_
// Start collecting quorum on block
if (watch_work_a || node.ledger.dependents_confirmed (transaction_a, *block_a))
{
node.active.insert (block_a, process_return_a.previous_balance.number ());
auto account = block_a->account ().is_zero () ? block_a->sideband ().account : block_a->account ();
node.scheduler.activate (account, transaction_a);
}
else
{

View file

@ -0,0 +1,126 @@
#include <nano/node/election_scheduler.hpp>
#include <nano/node/node.hpp>
nano::election_scheduler::election_scheduler (nano::node & node) :
node{ node },
stopped{ false },
thread{ [this] () { run (); } }
{
}
nano::election_scheduler::~election_scheduler ()
{
stop ();
thread.join ();
}
void nano::election_scheduler::manual (std::shared_ptr<nano::block> const & block_a, boost::optional<nano::uint128_t> const & previous_balance_a, nano::election_behavior election_behavior_a, std::function<void (std::shared_ptr<nano::block> const &)> const & confirmation_action_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
manual_queue.push_back (std::make_tuple (block_a, previous_balance_a, election_behavior_a, confirmation_action_a));
observe ();
}
void nano::election_scheduler::activate (nano::account const & account_a, nano::transaction const & transaction)
{
debug_assert (!account_a.is_zero ());
nano::account_info account_info;
if (!node.store.account_get (transaction, account_a, account_info))
{
nano::confirmation_height_info conf_info;
node.store.confirmation_height_get (transaction, account_a, conf_info);
if (conf_info.height < account_info.block_count)
{
debug_assert (conf_info.frontier != account_info.head);
auto hash = conf_info.height == 0 ? account_info.open_block : node.store.block_successor (transaction, conf_info.frontier);
auto block = node.store.block_get (transaction, hash);
debug_assert (block != nullptr);
if (node.ledger.dependents_confirmed (transaction, *block))
{
nano::lock_guard<nano::mutex> lock{ mutex };
priority.push (account_info.modified, block);
observe ();
}
}
}
}
void nano::election_scheduler::stop ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
stopped = true;
observe ();
}
void nano::election_scheduler::flush ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
auto priority_target = priority_queued + priority.size ();
auto manual_target = manual_queued + manual_queue.size ();
condition.wait (lock, [this, &priority_target, &manual_target] () {
return priority_queued >= priority_target && manual_queued >= manual_target;
});
}
void nano::election_scheduler::observe ()
{
condition.notify_all ();
}
size_t nano::election_scheduler::size () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
return priority.size () + manual_queue.size ();
}
bool nano::election_scheduler::empty () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
return priority.empty () && manual_queue.empty ();
}
size_t nano::election_scheduler::priority_queue_size () const
{
return priority.size ();
}
void nano::election_scheduler::run ()
{
nano::thread_role::set (nano::thread_role::name::election_scheduler);
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
condition.wait (lock, [this] () {
auto vacancy = node.active.vacancy ();
auto has_vacancy = vacancy > 0;
auto available = !priority.empty () || !manual_queue.empty ();
return stopped || (has_vacancy && available);
});
debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds
if (!stopped)
{
if (!priority.empty ())
{
auto block = priority.top ();
std::shared_ptr<nano::election> election;
nano::unique_lock<nano::mutex> lock2 (node.active.mutex);
election = node.active.insert_impl (lock2, block).election;
if (election != nullptr)
{
election->transition_active ();
}
priority.pop ();
++priority_queued;
}
if (!manual_queue.empty ())
{
auto const [block, previous_balance, election_behavior, confirmation_action] = manual_queue.front ();
nano::unique_lock<nano::mutex> lock2 (node.active.mutex);
node.active.insert_impl (lock2, block, previous_balance, election_behavior, confirmation_action);
manual_queue.pop_front ();
++manual_queued;
}
observe ();
}
}
}

View file

@ -0,0 +1,47 @@
#pragma once
#include <nano/lib/numbers.hpp>
#include <nano/node/active_transactions.hpp>
#include <nano/node/prioritization.hpp>
#include <boost/optional.hpp>
#include <condition_variable>
#include <deque>
#include <memory>
#include <thread>
namespace nano
{
class block;
class node;
class election_scheduler final
{
public:
election_scheduler (nano::node & node);
~election_scheduler ();
// Manualy start an election for a block
// Call action with confirmed block, may be different than what we started with
void manual (std::shared_ptr<nano::block> const &, boost::optional<nano::uint128_t> const & = boost::none, nano::election_behavior = nano::election_behavior::normal, std::function<void (std::shared_ptr<nano::block> const &)> const & = nullptr);
// Activates the first unconfirmed block of \p account_a
void activate (nano::account const &, nano::transaction const &);
void stop ();
void flush ();
void observe ();
size_t size () const;
bool empty () const;
size_t priority_queue_size () const;
private:
void run ();
nano::prioritization priority;
uint64_t priority_queued{ 0 };
std::deque<std::tuple<std::shared_ptr<nano::block>, boost::optional<nano::uint128_t>, nano::election_behavior, std::function<void (std::shared_ptr<nano::block>)>>> manual_queue;
uint64_t manual_queued{ 0 };
nano::node & node;
bool stopped;
nano::condition_variable condition;
mutable nano::mutex mutex;
std::thread thread;
};
}

View file

@ -117,6 +117,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
vote_uniquer (block_uniquer),
confirmation_height_processor (ledger, write_database_queue, config.conf_height_processor_batch_min_time, config.logging, logger, node_initialized_latch, flags.confirmation_height_processor_mode),
active (*this, confirmation_height_processor),
scheduler{ *this },
aggregator (network_params.network, config, stats, active.generator, history, ledger, wallets, active),
wallets (wallets_store.init_error (), *this),
startup_time (std::chrono::steady_clock::now ()),
@ -126,6 +127,8 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
{
telemetry->start ();
active.vacancy_update = [this] () { scheduler.observe (); };
if (config.websocket_config.enabled)
{
auto endpoint_l (nano::tcp_endpoint (boost::asio::ip::make_address_v6 (config.websocket_config.address), config.websocket_config.port));
@ -650,6 +653,12 @@ void nano::node::start ()
{
port_mapping.start ();
}
if (config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled)
{
workers.push_task ([this_l = shared ()] () {
this_l->ongoing_backlog_population ();
});
}
}
void nano::node::stop ()
@ -663,6 +672,7 @@ void nano::node::stop ()
block_processor.stop ();
aggregator.stop ();
vote_processor.stop ();
scheduler.stop ();
active.stop ();
confirmation_height_processor.stop ();
network.stop ();
@ -957,12 +967,20 @@ void nano::node::unchecked_cleanup ()
void nano::node::ongoing_unchecked_cleanup ()
{
unchecked_cleanup ();
auto this_l (shared ());
workers.add_timed_task (std::chrono::steady_clock::now () + network_params.node.unchecked_cleaning_interval, [this_l] () {
workers.add_timed_task (std::chrono::steady_clock::now () + network_params.node.unchecked_cleaning_interval, [this_l = shared ()] () {
this_l->ongoing_unchecked_cleanup ();
});
}
void nano::node::ongoing_backlog_population ()
{
populate_backlog ();
auto delay = config.network_params.network.is_dev_network () ? std::chrono::seconds{ 1 } : std::chrono::duration_cast<std::chrono::seconds> (std::chrono::minutes{ 5 });
workers.add_timed_task (std::chrono::steady_clock::now () + delay, [this_l = shared ()] () {
this_l->ongoing_backlog_population ();
});
}
bool nano::node::collect_ledger_pruning_targets (std::deque<nano::block_hash> & pruning_targets_a, nano::account & last_account_a, uint64_t const batch_read_size_a, uint64_t const max_depth_a, uint64_t const cutoff_time_a)
{
uint64_t read_operations (0);
@ -1219,10 +1237,12 @@ void nano::node::add_initial_peers ()
void nano::node::block_confirm (std::shared_ptr<nano::block> const & block_a)
{
auto election = active.insert (block_a);
if (election.inserted)
scheduler.manual (block_a);
scheduler.flush ();
auto election = active.election (block_a->qualified_root ());
if (election != nullptr)
{
election.election->transition_active ();
election->transition_active ();
}
}
@ -1715,6 +1735,26 @@ std::pair<uint64_t, decltype (nano::ledger::bootstrap_weights)> nano::node::get_
return { max_blocks, weights };
}
void nano::node::populate_backlog ()
{
auto done = false;
uint64_t const chunk_size = 65536;
nano::account next = 0;
uint64_t total = 0;
while (!stopped && !done)
{
auto transaction = store.tx_begin_read ();
auto count = 0;
for (auto i = store.accounts_begin (transaction, next), n = store.accounts_end (); !stopped && i != n && count < chunk_size; ++i, ++count, ++total)
{
auto const & account = i->first;
scheduler.activate (account, transaction);
next = account.number () + 1;
}
done = store.accounts_begin (transaction, next) == store.accounts_end ();
}
}
nano::node_wrapper::node_wrapper (boost::filesystem::path const & path_a, boost::filesystem::path const & config_path_a, nano::node_flags const & node_flags_a) :
io_context (std::make_shared<boost::asio::io_context> ()),
work (1)

View file

@ -11,6 +11,7 @@
#include <nano/node/confirmation_height_processor.hpp>
#include <nano/node/distributed_work_factory.hpp>
#include <nano/node/election.hpp>
#include <nano/node/election_scheduler.hpp>
#include <nano/node/gap_cache.hpp>
#include <nano/node/network.hpp>
#include <nano/node/node_observers.hpp>
@ -118,6 +119,7 @@ public:
void ongoing_bootstrap ();
void ongoing_peer_store ();
void ongoing_unchecked_cleanup ();
void ongoing_backlog_population ();
void backup_wallet ();
void search_pending ();
void bootstrap_wallet ();
@ -147,6 +149,7 @@ public:
bool init_error () const;
bool epoch_upgrader (nano::raw_key const &, nano::epoch, uint64_t, uint64_t);
std::pair<uint64_t, decltype (nano::ledger::bootstrap_weights)> get_bootstrap_weights () const;
void populate_backlog ();
nano::write_database_queue write_database_queue;
boost::asio::io_context & io_ctx;
boost::latch node_initialized_latch;
@ -185,6 +188,7 @@ public:
nano::vote_uniquer vote_uniquer;
nano::confirmation_height_processor confirmation_height_processor;
nano::active_transactions active;
nano::election_scheduler scheduler;
nano::request_aggregator aggregator;
nano::wallets wallets;
const std::chrono::steady_clock::time_point startup_time;

View file

@ -2131,7 +2131,7 @@ TEST (rpc, process_subtype_open)
ASSERT_EQ (nano::process_result::progress, node1.process (*send).code);
ASSERT_EQ (nano::process_result::progress, node2.process (*send).code);
scoped_io_thread_name_change scoped_thread_name_io;
node1.active.insert (send);
node1.scheduler.manual (send);
nano::state_block open (key.pub, 0, key.pub, nano::Gxrb_ratio, send->hash (), key.prv, key.pub, *node1.work_generate_blocking (key.pub));
nano::node_rpc_config node_rpc_config;
nano::ipc::ipc_server ipc_server (node1, node_rpc_config);
@ -2174,7 +2174,7 @@ TEST (rpc, process_subtype_receive)
ASSERT_EQ (nano::process_result::progress, node1.process (*send).code);
ASSERT_EQ (nano::process_result::progress, node2.process (*send).code);
scoped_io_thread_name_change scoped_thread_name_io;
node1.active.insert (send);
node1.scheduler.manual (send);
nano::state_block receive (nano::dev_genesis_key.pub, send->hash (), nano::dev_genesis_key.pub, nano::genesis_amount, send->hash (), nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *node1.work_generate_blocking (send->hash ()));
nano::node_rpc_config node_rpc_config;
nano::ipc::ipc_server ipc_server (node1, node_rpc_config);

View file

@ -840,7 +840,7 @@ nano::uint128_t nano::ledger::account_pending (nano::transaction const & transac
nano::pending_info const & info (i->second);
if (only_confirmed_a)
{
if (block_confirmed_or_pruned_exists (transaction_a, i->first.hash))
if (block_confirmed (transaction_a, i->first.hash))
{
result += info.amount.number ();
}

View file

@ -535,7 +535,7 @@ TEST (confirmation_height, many_accounts_single_confirmation)
{
auto block = node->block (last_open_hash);
ASSERT_NE (nullptr, block);
node->active.insert (block);
node->scheduler.manual (block);
auto election = node->active.election (block->qualified_root ());
ASSERT_NE (nullptr, election);
election->force_confirm ();
@ -603,7 +603,7 @@ TEST (confirmation_height, many_accounts_many_confirmations)
// Confirm all of the accounts
for (auto & open_block : open_blocks)
{
node->active.insert (open_block);
node->scheduler.manual (open_block);
auto election = node->active.election (open_block->qualified_root ());
ASSERT_NE (nullptr, election);
election->force_confirm ();
@ -690,7 +690,7 @@ TEST (confirmation_height, long_chains)
// Call block confirm on the existing receive block on the genesis account which will confirm everything underneath on both accounts
{
node->active.insert (receive1);
node->scheduler.manual (receive1);
auto election = node->active.election (receive1->qualified_root ());
ASSERT_NE (nullptr, election);
election->force_confirm ();
@ -1853,7 +1853,7 @@ TEST (node, wallet_create_block_confirm_conflicts)
// Call block confirm on the top level send block which will confirm everything underneath on both accounts.
{
auto block = node->store.block_get (node->store.tx_begin_read (), latest);
node->active.insert (block);
node->scheduler.manual (block);
auto election = node->active.election (block->qualified_root ());
ASSERT_NE (nullptr, election);
election->force_confirm ();