Store origin channel as weak_ptr

This commit is contained in:
Piotr Wójcik 2024-04-03 22:32:33 +02:00
commit 676f71525b
2 changed files with 82 additions and 13 deletions

View file

@ -260,8 +260,9 @@ TEST (fair_queue, cleanup)
ASSERT_EQ (queue.size ({ source_enum::live, channel2 }), 1);
ASSERT_EQ (queue.size ({ source_enum::live, channel3 }), 1);
// Either closing or resetting the channel should remove it from the queue
channel1->close ();
channel2->close ();
channel2.reset ();
ASSERT_TRUE (queue.periodic_update ());

View file

@ -18,9 +18,6 @@ template <typename Request, typename Source>
class fair_queue final
{
public:
/**
* Holds user supplied source type(s) and an optional channel. This is used to uniquely identify and categorize the source of a request.
*/
struct origin
{
Source source;
@ -28,24 +25,95 @@ public:
origin (Source source, std::shared_ptr<nano::transport::channel> channel = nullptr) :
source{ source },
channel{ std::move (channel) }
channel{ channel }
{
}
};
private:
/**
* Holds user supplied source type(s) and an optional channel. This is used to uniquely identify and categorize the source of a request.
*/
struct origin_entry
{
Source source;
// Optional is needed to distinguish between a source with no associated channel and a source with an expired channel
// TODO: Store channel as shared_ptr after networking fixes are done
std::optional<std::weak_ptr<nano::transport::channel>> maybe_channel;
origin_entry (Source source, std::shared_ptr<nano::transport::channel> channel = nullptr) :
source{ source }
{
if (channel)
{
maybe_channel = std::weak_ptr{ channel };
}
}
origin_entry (origin const & origin) :
origin_entry (origin.source, origin.channel)
{
}
bool alive () const
{
if (channel)
if (maybe_channel)
{
return channel->alive ();
if (auto channel_l = maybe_channel->lock ())
{
return channel_l->alive ();
}
else
{
return false;
}
}
else
{
// Some sources (eg. local RPC) don't have an associated channel, never remove their queue
return true;
}
// Some sources (eg. local RPC) don't have an associated channel, never remove their queue
return true;
}
auto operator<=> (origin const &) const = default;
// TODO: Store channel as shared_ptr to avoid this mess
auto operator<=> (origin_entry const & other) const
{
// First compare source
if (auto cmp = source <=> other.source; cmp != 0)
return cmp;
if (maybe_channel && other.maybe_channel)
{
// Then compare channels by ownership, not by the channel's value or state
std::owner_less<std::weak_ptr<nano::transport::channel>> less;
if (less (*maybe_channel, *other.maybe_channel))
return std::strong_ordering::less;
if (less (*other.maybe_channel, *maybe_channel))
return std::strong_ordering::greater;
return std::strong_ordering::equivalent;
}
else
{
if (maybe_channel && !other.maybe_channel)
{
return std::strong_ordering::greater;
}
if (!maybe_channel && other.maybe_channel)
{
return std::strong_ordering::less;
}
return std::strong_ordering::equivalent;
}
}
operator origin () const
{
return { source, maybe_channel ? maybe_channel->lock () : nullptr };
}
};
private:
struct entry
{
using queue_t = std::deque<Request>;
@ -271,8 +339,8 @@ private:
}
private:
std::map<origin_type, entry> queues;
std::map<origin_type, entry>::iterator iterator{ queues.end () };
std::map<origin_entry, entry> queues;
std::map<origin_entry, entry>::iterator iterator{ queues.end () };
size_t counter{ 0 };
std::chrono::steady_clock::time_point last_update{};