Batch cemented callback

This commit is contained in:
Piotr Wójcik 2024-05-24 10:35:46 +02:00
commit b6ee1692c5
6 changed files with 57 additions and 30 deletions

View file

@ -117,6 +117,8 @@ enum class detail
triggered,
notify,
duplicate,
confirmed,
cemented,
// processing queue
queue,
@ -444,7 +446,8 @@ enum class detail
tier_3,
// confirming_set
confirmed,
notify_cemented,
notify_already_confirmed,
_last // Must be the last enum
};

View file

@ -18,10 +18,10 @@
using namespace std::chrono;
nano::active_elections::active_elections (nano::node & node_a, nano::confirming_set & confirming_set, nano::block_processor & block_processor_a) :
nano::active_elections::active_elections (nano::node & node_a, nano::confirming_set & confirming_set_a, nano::block_processor & block_processor_a) :
config{ node_a.config.active_elections },
node{ node_a },
confirming_set{ confirming_set },
confirming_set{ confirming_set_a },
block_processor{ block_processor_a },
recently_confirmed{ config.confirmation_cache },
recently_cemented{ config.confirmation_history_size },
@ -29,14 +29,20 @@ nano::active_elections::active_elections (nano::node & node_a, nano::confirming_
{
count_by_behavior.fill (0); // Zero initialize array
// Register a callback which will get called after a block is cemented
confirming_set.cemented_observers.add ([this] (std::shared_ptr<nano::block> const & callback_block_a) {
this->block_cemented_callback (callback_block_a);
});
confirming_set.batch_cemented.add ([this] (nano::confirming_set::cemented_notification const & notification) {
{
auto transaction = node.ledger.tx_begin_read ();
for (auto const & block : notification.cemented)
{
transaction.refresh_if_needed ();
// Register a callback which will get called if a block is already cemented
confirming_set.block_already_cemented_observers.add ([this] (nano::block_hash const & hash_a) {
this->block_already_cemented_callback (hash_a);
block_cemented_callback (transaction, block);
}
}
for (auto const & hash : notification.already_cemented)
{
block_already_cemented_callback (hash);
}
});
// Notify elections about alternative (forked) blocks
@ -84,7 +90,7 @@ void nano::active_elections::stop ()
clear ();
}
void nano::active_elections::block_cemented_callback (std::shared_ptr<nano::block> const & block)
void nano::active_elections::block_cemented_callback (nano::secure::transaction const & transaction, std::shared_ptr<nano::block> const & block)
{
debug_assert (node.block_confirmed (block->hash ()));
if (auto election_l = election (block->qualified_root ()))
@ -100,7 +106,7 @@ void nano::active_elections::block_cemented_callback (std::shared_ptr<nano::bloc
status = election->get_status ();
votes = election->votes_with_weight ();
}
if (confirming_set.exists (block->hash ()))
if (confirming_set.exists (block->hash ())) // TODO: This can be passed from the confirming_set
{
status.type = nano::election_status_type::active_confirmed_quorum;
}
@ -113,7 +119,7 @@ void nano::active_elections::block_cemented_callback (std::shared_ptr<nano::bloc
status.type = nano::election_status_type::inactive_confirmation_height;
}
recently_cemented.put (status);
auto transaction = node.ledger.tx_begin_read ();
notify_observers (transaction, status, votes);
bool cemented_bootstrap_count_reached = node.ledger.cemented_count () >= node.ledger.bootstrap_weight_max_blocks;
bool was_active = status.type == nano::election_status_type::active_confirmed_quorum || status.type == nano::election_status_type::active_confirmation_height;

View file

@ -120,8 +120,6 @@ public:
bool empty () const;
std::size_t size () const;
bool publish (std::shared_ptr<nano::block> const &);
void block_cemented_callback (std::shared_ptr<nano::block> const &);
void block_already_cemented_callback (nano::block_hash const &);
/**
* Maximum number of elections that should be present in this container
@ -148,6 +146,8 @@ private:
std::vector<std::shared_ptr<nano::election>> list_active_impl (std::size_t) const;
void activate_successors (nano::secure::transaction const &, std::shared_ptr<nano::block> const & block);
void notify_observers (nano::secure::transaction const &, nano::election_status const & status, std::vector<nano::vote_with_weight_info> const & votes) const;
void block_cemented_callback (nano::secure::transaction const &, std::shared_ptr<nano::block> const &);
void block_already_cemented_callback (nano::block_hash const &);
private: // Dependencies
active_elections_config const & config;

View file

@ -7,12 +7,24 @@
#include <nano/store/component.hpp>
#include <nano/store/write_queue.hpp>
nano::confirming_set::confirming_set (nano::ledger & ledger, nano::stats & stats, std::chrono::milliseconds batch_time) :
ledger{ ledger },
stats{ stats },
batch_time{ batch_time },
nano::confirming_set::confirming_set (nano::ledger & ledger_a, nano::stats & stats_a, std::chrono::milliseconds batch_time_a) :
ledger{ ledger_a },
stats{ stats_a },
batch_time{ batch_time_a },
workers{ 1, nano::thread_role::name::confirmation_height_notifications }
{
batch_cemented.add ([this] (auto const & notification) {
for (auto const & i : notification.cemented)
{
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::notify_cemented);
cemented_observers.notify (i);
}
for (auto const & i : notification.already_cemented)
{
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::notify_already_confirmed);
block_already_cemented_observers.notify (i);
}
});
}
nano::confirming_set::~confirming_set ()
@ -125,7 +137,7 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
{
// Confirming this block may implicitly confirm more
cemented.insert (cemented.end (), added.begin (), added.end ());
stats.add (nano::stat::type::confirming_set, nano::stat::detail::confirmed, added.size ());
stats.add (nano::stat::type::confirming_set, nano::stat::detail::cemented, added.size ());
}
else
{
@ -139,17 +151,14 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
lock.unlock ();
workers.push_task ([this, cemented = std::move (cemented), already = std::move (already)] () {
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::notify);
cemented_notification notification{
.cemented = std::move (cemented),
.already_cemented = std::move (already)
};
for (auto const & i : cemented)
{
cemented_observers.notify (i);
}
for (auto const & i : already)
{
block_already_cemented_observers.notify (i);
}
workers.push_task ([this, notification = std::move (notification)] () {
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::notify);
batch_cemented.notify (notification);
});
lock.lock ();

View file

@ -41,7 +41,15 @@ public:
std::size_t size () const;
std::unique_ptr<container_info_component> collect_container_info (std::string const & name) const;
public: // Events
// Observers will be called once ledger has blocks marked as confirmed
struct cemented_notification
{
std::deque<std::shared_ptr<nano::block>> cemented;
std::deque<nano::block_hash> already_cemented;
};
nano::observer_set<cemented_notification const &> batch_cemented;
nano::observer_set<std::shared_ptr<nano::block>> cemented_observers;
nano::observer_set<nano::block_hash const &> block_already_cemented_observers;

View file

@ -476,6 +476,7 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
}
}
confirming_set.cemented_observers.add ([this] (auto const & block) {
// TODO: Is it neccessary to call this for all blocks?
if (block->is_send ())
{
workers.push_task ([this, hash = block->hash (), destination = block->destination ()] () {