Remove recently dropped check to restart elections (#3136)

Up until "recently", a new write transaction was opened to update work on the ledger for every single block. Now with deferred work updates on the block processor, it is no more expensive than processing a new block, so it makes sense to remove this constraint of having been recently dropped. This improves quality of service.

The previously implicit check for confirmed dependents (since the election was dropped) is now explicit. The work on the ledger is updated regardless of that check passing.

The election is not immediately inserted as *active* anymore, same behavior as the normal election insertion path.

Note that if an election is active, the work is **not** updated on the ledger. That behavior also seems desirable. This could be achieved by updating the store after the block is identified as old, directly within `ledger::process`. For post-processing, a flag can be passed to `blockprocessor::process_old`, at which point `active_transactions::restart` can be scrapped since it becomes a simple election insertion + stats update (with a dependents confirmed check). Since this change would touch ledger code I am leaving for others to do it. There's also the question if the confirmed status should be checked within the ledger processing code.

Note: only tested via core_test.
This commit is contained in:
Guilherme Lawless 2021-03-15 10:58:45 +00:00 committed by GitHub
commit 45afd5b2ce
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 36 additions and 110 deletions

View file

@ -130,7 +130,6 @@ TEST (active_transactions, keep_local)
auto send6 (wallet.send_action (nano::dev_genesis_key.pub, key6.pub, node.config.receive_minimum.number ())); auto send6 (wallet.send_action (nano::dev_genesis_key.pub, key6.pub, node.config.receive_minimum.number ()));
// should not drop wallet created transactions // should not drop wallet created transactions
ASSERT_TIMELY (5s, node.active.size () == 6); ASSERT_TIMELY (5s, node.active.size () == 6);
ASSERT_EQ (0, node.active.recently_dropped.size ());
for (auto const & block : { send1, send2, send3, send4, send5, send6 }) for (auto const & block : { send1, send2, send3, send4, send5, send6 })
{ {
auto election = node.active.election (block->qualified_root ()); auto election = node.active.election (block->qualified_root ());
@ -172,7 +171,6 @@ TEST (active_transactions, keep_local)
node.block_processor.flush (); node.block_processor.flush ();
// bound elections, should drop after one loop // bound elections, should drop after one loop
ASSERT_TIMELY (5s, node.active.size () == node_config.active_elections_size); ASSERT_TIMELY (5s, node.active.size () == node_config.active_elections_size);
ASSERT_EQ (1, node.active.recently_dropped.size ());
ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::election_drop)); ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::election_drop));
} }
@ -711,14 +709,13 @@ TEST (active_transactions, dropped_cleanup)
// The filter must have been cleared // The filter must have been cleared
ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ())); ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
// Added as recently dropped // An election was recently dropped
ASSERT_NE (std::chrono::steady_clock::time_point{}, node.active.recently_dropped.find (block->qualified_root ())); ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::election_drop));
// Block cleared from active // Block cleared from active
ASSERT_EQ (0, node.active.blocks.count (block->hash ())); ASSERT_EQ (0, node.active.blocks.count (block->hash ()));
// Repeat test for a confirmed election // Repeat test for a confirmed election
node.active.recently_dropped.erase (block->qualified_root ());
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ())); ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
election = node.active.insert (block).election; election = node.active.insert (block).election;
ASSERT_NE (nullptr, election); ASSERT_NE (nullptr, election);
@ -729,8 +726,8 @@ TEST (active_transactions, dropped_cleanup)
// The filter should not have been cleared // The filter should not have been cleared
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ())); ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
// Not added as recently dropped // Not dropped
ASSERT_EQ (std::chrono::steady_clock::time_point{}, node.active.recently_dropped.find (block->qualified_root ())); ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::election_drop));
// Block cleared from active // Block cleared from active
ASSERT_EQ (0, node.active.blocks.count (block->hash ())); ASSERT_EQ (0, node.active.blocks.count (block->hash ()));
@ -1433,7 +1430,6 @@ TEST (active_transactions, restart_dropped)
.work (*system.work.generate (genesis.hash ())) .work (*system.work.generate (genesis.hash ()))
.build_shared (); // Process only in ledger and simulate dropping the election .build_shared (); // Process only in ledger and simulate dropping the election
ASSERT_EQ (nano::process_result::progress, node.process (*send).code); ASSERT_EQ (nano::process_result::progress, node.process (*send).code);
node.active.recently_dropped.add (send->qualified_root ());
// Generate higher difficulty work // Generate higher difficulty work
ASSERT_TRUE (node.work_generate_blocking (*send, send->difficulty () + 1).is_initialized ()); ASSERT_TRUE (node.work_generate_blocking (*send, send->difficulty () + 1).is_initialized ());
// Process the same block with updated work // Process the same block with updated work
@ -1446,8 +1442,6 @@ TEST (active_transactions, restart_dropped)
ASSERT_NE (nullptr, ledger_block); ASSERT_NE (nullptr, ledger_block);
// Exact same block, including work value must have been re-written // Exact same block, including work value must have been re-written
ASSERT_EQ (*send, *ledger_block); ASSERT_EQ (*send, *ledger_block);
// Removed from the dropped elections cache
ASSERT_EQ (std::chrono::steady_clock::time_point{}, node.active.recently_dropped.find (send->qualified_root ()));
// Drop election // Drop election
node.active.erase (*send); node.active.erase (*send);
ASSERT_EQ (0, node.active.size ()); ASSERT_EQ (0, node.active.size ());

View file

@ -4496,22 +4496,27 @@ TEST (node, deferred_dependent_elections)
ASSERT_TIMELY (2s, node2.block (send2->hash ())); ASSERT_TIMELY (2s, node2.block (send2->hash ()));
// Re-processing older blocks with updated work also does not start an election // Re-processing older blocks with updated work also does not start an election
node.work_generate_blocking (*open, open->difficulty ()); node.work_generate_blocking (*open, open->difficulty () + 1);
node.process_active (open); node.process_active (open);
node.block_processor.flush (); node.block_processor.flush ();
ASSERT_FALSE (node.active.active (open->qualified_root ())); ASSERT_FALSE (node.active.active (open->qualified_root ()));
/// However, work is still updated
ASSERT_TIMELY (3s, node.store.block_get (node.store.tx_begin_read (), open->hash ())->block_work () == open->block_work ());
// It is however possible to manually start an election from elsewhere // It is however possible to manually start an election from elsewhere
node.block_confirm (open); node.block_confirm (open);
ASSERT_TRUE (node.active.active (open->qualified_root ())); ASSERT_TRUE (node.active.active (open->qualified_root ()));
// Dropping an election allows restarting it [with higher work]
node.active.erase (*open); node.active.erase (*open);
ASSERT_FALSE (node.active.active (open->qualified_root ())); ASSERT_FALSE (node.active.active (open->qualified_root ()));
ASSERT_NE (std::chrono::steady_clock::time_point{}, node.active.recently_dropped.find (open->qualified_root ()));
/// The election was dropped but it's still not possible to restart it
node.work_generate_blocking (*open, open->difficulty () + 1);
ASSERT_FALSE (node.active.active (open->qualified_root ()));
node.process_active (open); node.process_active (open);
node.block_processor.flush (); node.block_processor.flush ();
ASSERT_TRUE (node.active.active (open->qualified_root ())); ASSERT_FALSE (node.active.active (open->qualified_root ()));
/// However, work is still updated
ASSERT_TIMELY (3s, node.store.block_get (node.store.tx_begin_read (), open->hash ())->block_work () == open->block_work ());
// Frontier confirmation also starts elections // Frontier confirmation also starts elections
ASSERT_NO_ERROR (system.poll_until_true (5s, [&node, &send2] { ASSERT_NO_ERROR (system.poll_until_true (5s, [&node, &send2] {
@ -4566,6 +4571,14 @@ TEST (node, deferred_dependent_elections)
node.process_active (fork); node.process_active (fork);
node.block_processor.flush (); node.block_processor.flush ();
ASSERT_TRUE (node.active.active (receive->qualified_root ())); ASSERT_TRUE (node.active.active (receive->qualified_root ()));
/// If dropped, the election can be restarted once higher work is provided
node.active.erase (*fork);
ASSERT_FALSE (node.active.active (fork->qualified_root ()));
node.work_generate_blocking (*receive, receive->difficulty () + 1);
node.process_active (receive);
node.block_processor.flush ();
ASSERT_TRUE (node.active.active (receive->qualified_root ()));
} }
} }

View file

@ -261,8 +261,6 @@ char const * nano::mutex_identifier (mutexes mutex)
return "blockstore_cache"; return "blockstore_cache";
case mutexes::confirmation_height_processor: case mutexes::confirmation_height_processor:
return "confirmation_height_processor"; return "confirmation_height_processor";
case mutexes::dropped_elections:
return "dropped_elections";
case mutexes::election_winner_details: case mutexes::election_winner_details:
return "election_winner_details"; return "election_winner_details";
case mutexes::gap_cache: case mutexes::gap_cache:

View file

@ -25,7 +25,6 @@ enum class mutexes
block_uniquer, block_uniquer,
blockstore_cache, blockstore_cache,
confirmation_height_processor, confirmation_height_processor,
dropped_elections,
election_winner_details, election_winner_details,
gap_cache, gap_cache,
network_filter, network_filter,

View file

@ -19,7 +19,6 @@ size_t constexpr nano::active_transactions::max_active_elections_frontier_insert
constexpr std::chrono::minutes nano::active_transactions::expired_optimistic_election_info_cutoff; constexpr std::chrono::minutes nano::active_transactions::expired_optimistic_election_info_cutoff;
nano::active_transactions::active_transactions (nano::node & node_a, nano::confirmation_height_processor & confirmation_height_processor_a) : nano::active_transactions::active_transactions (nano::node & node_a, nano::confirmation_height_processor & confirmation_height_processor_a) :
recently_dropped (node_a.stats),
confirmation_height_processor (confirmation_height_processor_a), confirmation_height_processor (confirmation_height_processor_a),
node (node_a), node (node_a),
multipliers_cb (20, 1.), multipliers_cb (20, 1.),
@ -358,7 +357,7 @@ void nano::active_transactions::cleanup_election (nano::unique_lock<nano::mutex>
if (!info_a.confirmed) if (!info_a.confirmed)
{ {
recently_dropped.add (info_a.root); node.stats.inc (nano::stat::type::election, nano::stat::detail::election_drop);
} }
for (auto const & [hash, block] : info_a.blocks) for (auto const & [hash, block] : info_a.blocks)
@ -1045,32 +1044,28 @@ bool nano::active_transactions::update_difficulty_impl (nano::active_transaction
bool nano::active_transactions::restart (nano::transaction const & transaction_a, std::shared_ptr<nano::block> const & block_a) bool nano::active_transactions::restart (nano::transaction const & transaction_a, std::shared_ptr<nano::block> const & block_a)
{ {
// Only guaranteed to restart the election if the new block is received within 2 minutes of its election being dropped
constexpr std::chrono::minutes recently_dropped_cutoff{ 2 };
bool error = true; bool error = true;
if (recently_dropped.find (block_a->qualified_root ()) > std::chrono::steady_clock::now () - recently_dropped_cutoff) auto hash (block_a->hash ());
auto ledger_block (node.store.block_get (transaction_a, hash));
if (ledger_block != nullptr && ledger_block->block_work () != block_a->block_work () && !node.block_confirmed_or_being_confirmed (transaction_a, hash))
{ {
auto hash (block_a->hash ()); if (block_a->difficulty () > ledger_block->difficulty ())
auto ledger_block (node.store.block_get (transaction_a, hash));
if (ledger_block != nullptr && ledger_block->block_work () != block_a->block_work () && !node.block_confirmed_or_being_confirmed (transaction_a, hash))
{ {
if (block_a->difficulty () > ledger_block->difficulty ()) // Re-writing the block is necessary to avoid the same work being received later to force restarting the election
// The existing block is re-written, not the arriving block, as that one might not have gone through a full signature check
ledger_block->block_work_set (block_a->block_work ());
// Deferred write
node.block_processor.update (ledger_block);
// Restart election for the upgraded block, previously dropped from elections
if (node.ledger.dependents_confirmed (transaction_a, *ledger_block))
{ {
// Re-writing the block is necessary to avoid the same work being received later to force restarting the election
// The existing block is re-written, not the arriving block, as that one might not have gone through a full signature check
ledger_block->block_work_set (block_a->block_work ());
// Queue for writing in the block processor to avoid opening a new write transaction for a single operation
node.block_processor.update (ledger_block);
// Restart election for the upgraded block, previously dropped from elections
auto previous_balance = node.ledger.balance (transaction_a, ledger_block->previous ()); auto previous_balance = node.ledger.balance (transaction_a, ledger_block->previous ());
auto insert_result = insert (ledger_block, previous_balance); auto insert_result = insert (ledger_block, previous_balance);
if (insert_result.inserted) if (insert_result.inserted)
{ {
error = false; error = false;
insert_result.election->transition_active ();
recently_dropped.erase (ledger_block->qualified_root ());
node.stats.inc (nano::stat::type::election, nano::stat::detail::election_restart); node.stats.inc (nano::stat::type::election, nano::stat::detail::election_restart);
} }
} }
@ -1588,47 +1583,3 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (ac
composite->add_component (collect_container_info (active_transactions.generator, "generator")); composite->add_component (collect_container_info (active_transactions.generator, "generator"));
return composite; return composite;
} }
nano::dropped_elections::dropped_elections (nano::stat & stats_a) :
stats (stats_a)
{
}
void nano::dropped_elections::add (nano::qualified_root const & root_a)
{
stats.inc (nano::stat::type::election, nano::stat::detail::election_drop);
nano::lock_guard<nano::mutex> guard (mutex);
auto & items_by_sequence = items.get<tag_sequence> ();
items_by_sequence.emplace_back (nano::election_timepoint{ std::chrono::steady_clock::now (), root_a });
if (items.size () > capacity)
{
items_by_sequence.pop_front ();
}
}
void nano::dropped_elections::erase (nano::qualified_root const & root_a)
{
nano::lock_guard<nano::mutex> guard (mutex);
items.get<tag_root> ().erase (root_a);
}
std::chrono::steady_clock::time_point nano::dropped_elections::find (nano::qualified_root const & root_a) const
{
nano::lock_guard<nano::mutex> guard (mutex);
auto & items_by_root = items.get<tag_root> ();
auto existing (items_by_root.find (root_a));
if (existing != items_by_root.end ())
{
return existing->time;
}
else
{
return std::chrono::steady_clock::time_point{};
}
}
size_t nano::dropped_elections::size () const
{
nano::lock_guard<nano::mutex> guard (mutex);
return items.size ();
}

View file

@ -105,34 +105,6 @@ public:
bool aggressive_mode{ false }; bool aggressive_mode{ false };
}; };
class dropped_elections final
{
public:
dropped_elections (nano::stat &);
void add (nano::qualified_root const &);
void erase (nano::qualified_root const &);
std::chrono::steady_clock::time_point find (nano::qualified_root const &) const;
size_t size () const;
static size_t constexpr capacity{ 16 * 1024 };
// clang-format off
class tag_sequence {};
class tag_root {};
using ordered_dropped = boost::multi_index_container<nano::election_timepoint,
mi::indexed_by<
mi::sequenced<mi::tag<tag_sequence>>,
mi::hashed_unique<mi::tag<tag_root>,
mi::member<nano::election_timepoint, decltype(nano::election_timepoint::root), &nano::election_timepoint::root>>>>;
// clang-format on
private:
ordered_dropped items;
mutable nano::mutex mutex{ mutex_identifier (mutexes::dropped_elections) };
nano::stat & stats;
};
class election_insertion_result final class election_insertion_result final
{ {
public: public:
@ -222,7 +194,6 @@ public:
std::unordered_map<nano::block_hash, std::shared_ptr<nano::election>> blocks; std::unordered_map<nano::block_hash, std::shared_ptr<nano::election>> blocks;
std::deque<nano::election_status> list_recently_cemented (); std::deque<nano::election_status> list_recently_cemented ();
std::deque<nano::election_status> recently_cemented; std::deque<nano::election_status> recently_cemented;
dropped_elections recently_dropped;
void add_recently_cemented (nano::election_status const &); void add_recently_cemented (nano::election_status const &);
void add_recently_confirmed (nano::qualified_root const &, nano::block_hash const &); void add_recently_confirmed (nano::qualified_root const &, nano::block_hash const &);