Rework cemented notification
This commit is contained in:
parent
0747d4afe4
commit
b7ba1eb08a
3 changed files with 24 additions and 28 deletions
|
@ -30,17 +30,17 @@ nano::active_elections::active_elections (nano::node & node_a, nano::confirming_
|
|||
{
|
||||
count_by_behavior.fill (0); // Zero initialize array
|
||||
|
||||
confirming_set.batch_cemented.add ([this] (nano::confirming_set::cemented_notification const & notification) {
|
||||
confirming_set.batch_cemented.add ([this] (auto const & cemented) {
|
||||
auto transaction = node.ledger.tx_begin_read ();
|
||||
for (auto const & [block, confirmation_root] : cemented)
|
||||
{
|
||||
auto transaction = node.ledger.tx_begin_read ();
|
||||
for (auto const & [block, confirmation_root] : notification.cemented)
|
||||
{
|
||||
transaction.refresh_if_needed ();
|
||||
|
||||
block_cemented_callback (transaction, block, confirmation_root);
|
||||
}
|
||||
transaction.refresh_if_needed ();
|
||||
block_cemented_callback (transaction, block, confirmation_root);
|
||||
}
|
||||
for (auto const & hash : notification.already_cemented)
|
||||
});
|
||||
|
||||
confirming_set.already_cemented.add ([this] (auto const & already_cemented) {
|
||||
for (auto const & hash : already_cemented)
|
||||
{
|
||||
block_already_cemented_callback (hash);
|
||||
}
|
||||
|
|
|
@ -12,8 +12,8 @@ nano::confirming_set::confirming_set (confirming_set_config const & config_a, na
|
|||
stats{ stats_a },
|
||||
notification_workers{ 1, nano::thread_role::name::confirmation_height_notifications }
|
||||
{
|
||||
batch_cemented.add ([this] (auto const & notification) {
|
||||
for (auto const & [block, confirmation_root] : notification.cemented)
|
||||
batch_cemented.add ([this] (auto const & cemented) {
|
||||
for (auto const & [block, confirmation_root] : cemented)
|
||||
{
|
||||
cemented_observers.notify (block);
|
||||
}
|
||||
|
@ -124,17 +124,17 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
|
|||
std::deque<cemented_t> cemented;
|
||||
std::deque<nano::block_hash> already;
|
||||
|
||||
auto batch = next_batch (256);
|
||||
auto batch = next_batch (batch_size);
|
||||
|
||||
lock.unlock ();
|
||||
|
||||
auto notify = [this, &cemented, &already] () {
|
||||
cemented_notification notification{};
|
||||
notification.cemented.swap (cemented);
|
||||
notification.already_cemented.swap (already);
|
||||
auto notify = [this, &cemented] () {
|
||||
std::deque<cemented_t> batch;
|
||||
batch.swap (cemented);
|
||||
|
||||
std::unique_lock lock{ mutex };
|
||||
|
||||
// It's possible that ledger cementing happens faster than the notifications can be processed by other components, cooldown here
|
||||
while (notification_workers.num_queued_tasks () >= config.max_queued_notifications)
|
||||
{
|
||||
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::cooldown);
|
||||
|
@ -145,9 +145,9 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
|
|||
}
|
||||
}
|
||||
|
||||
notification_workers.push_task ([this, notification = std::move (notification)] () {
|
||||
notification_workers.push_task ([this, batch = std::move (batch)] () {
|
||||
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::notify);
|
||||
batch_cemented.notify (notification);
|
||||
batch_cemented.notify (batch);
|
||||
});
|
||||
};
|
||||
|
||||
|
@ -211,9 +211,9 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
|
|||
}
|
||||
|
||||
notify ();
|
||||
|
||||
release_assert (cemented.empty ());
|
||||
release_assert (already.empty ());
|
||||
|
||||
already_cemented.notify (already);
|
||||
}
|
||||
|
||||
nano::container_info nano::confirming_set::container_info () const
|
||||
|
|
|
@ -48,16 +48,10 @@ public:
|
|||
nano::container_info container_info () const;
|
||||
|
||||
public: // Events
|
||||
// Observers will be called once ledger has blocks marked as confirmed
|
||||
using cemented_t = std::pair<std::shared_ptr<nano::block>, nano::block_hash>; // <block, confirmation root>
|
||||
nano::observer_set<std::deque<cemented_t> const &> batch_cemented;
|
||||
nano::observer_set<std::deque<nano::block_hash> const &> already_cemented;
|
||||
|
||||
struct cemented_notification
|
||||
{
|
||||
std::deque<cemented_t> cemented;
|
||||
std::deque<nano::block_hash> already_cemented;
|
||||
};
|
||||
|
||||
nano::observer_set<cemented_notification const &> batch_cemented;
|
||||
nano::observer_set<std::shared_ptr<nano::block>> cemented_observers;
|
||||
|
||||
private: // Dependencies
|
||||
|
@ -79,5 +73,7 @@ private:
|
|||
mutable std::mutex mutex;
|
||||
std::condition_variable condition;
|
||||
std::thread thread;
|
||||
|
||||
static size_t constexpr batch_size = 256;
|
||||
};
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue