Wait until fair queue source drains before removal (#4597)

This commit is contained in:
Piotr Wójcik 2024-05-03 12:20:52 +02:00 committed by GitHub
commit 3dc70af067
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 12 additions and 9 deletions

View file

@ -260,17 +260,20 @@ TEST (fair_queue, cleanup)
ASSERT_EQ (queue.size ({ source_enum::live, channel2 }), 1);
ASSERT_EQ (queue.size ({ source_enum::live, channel3 }), 1);
// Either closing or resetting the channel should remove it from the queue
// Either closing or resetting the channel should make it eligible for cleanup
channel1->close ();
channel2.reset ();
ASSERT_TRUE (queue.periodic_update ());
ASSERT_TRUE (queue.periodic_update (0s));
// Only channel 3 should remain
ASSERT_EQ (queue.size (), 1);
// Until the queue is drained, the entries are still present
ASSERT_EQ (queue.size (), 3);
ASSERT_EQ (queue.queues_size (), 3);
queue.next_batch (999);
ASSERT_TRUE (queue.periodic_update (0s));
ASSERT_TRUE (queue.empty ());
ASSERT_EQ (queue.queues_size (), 1);
ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 0);
ASSERT_EQ (queue.size ({ source_enum::live, channel2 }), 0);
ASSERT_EQ (queue.size ({ source_enum::live, channel3 }), 1);
}

View file

@ -330,7 +330,7 @@ private:
iterator = queues.end ();
erase_if (queues, [] (auto const & entry) {
return !entry.first.alive ();
return entry.second.empty () && !entry.first.alive ();
});
}