From 676f71525bf87b4685e10904ec29a99dd8a02faa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Wed, 3 Apr 2024 22:32:33 +0200 Subject: [PATCH] Store origin channel as weak_ptr --- nano/core_test/fair_queue.cpp | 3 +- nano/node/fair_queue.hpp | 92 ++++++++++++++++++++++++++++++----- 2 files changed, 82 insertions(+), 13 deletions(-) diff --git a/nano/core_test/fair_queue.cpp b/nano/core_test/fair_queue.cpp index 4078dea08..178365b17 100644 --- a/nano/core_test/fair_queue.cpp +++ b/nano/core_test/fair_queue.cpp @@ -260,8 +260,9 @@ TEST (fair_queue, cleanup) ASSERT_EQ (queue.size ({ source_enum::live, channel2 }), 1); ASSERT_EQ (queue.size ({ source_enum::live, channel3 }), 1); + // Either closing or resetting the channel should remove it from the queue channel1->close (); - channel2->close (); + channel2.reset (); ASSERT_TRUE (queue.periodic_update ()); diff --git a/nano/node/fair_queue.hpp b/nano/node/fair_queue.hpp index e066ec894..e13ed847c 100644 --- a/nano/node/fair_queue.hpp +++ b/nano/node/fair_queue.hpp @@ -18,9 +18,6 @@ template class fair_queue final { public: - /** - * Holds user supplied source type(s) and an optional channel. This is used to uniquely identify and categorize the source of a request. - */ struct origin { Source source; @@ -28,24 +25,95 @@ public: origin (Source source, std::shared_ptr channel = nullptr) : source{ source }, - channel{ std::move (channel) } + channel{ channel } + { + } + }; + +private: + /** + * Holds user supplied source type(s) and an optional channel. This is used to uniquely identify and categorize the source of a request. + */ + struct origin_entry + { + Source source; + + // Optional is needed to distinguish between a source with no associated channel and a source with an expired channel + // TODO: Store channel as shared_ptr after networking fixes are done + std::optional> maybe_channel; + + origin_entry (Source source, std::shared_ptr channel = nullptr) : + source{ source } + { + if (channel) + { + maybe_channel = std::weak_ptr{ channel }; + } + } + + origin_entry (origin const & origin) : + origin_entry (origin.source, origin.channel) { } bool alive () const { - if (channel) + if (maybe_channel) { - return channel->alive (); + if (auto channel_l = maybe_channel->lock ()) + { + return channel_l->alive (); + } + else + { + return false; + } + } + else + { + // Some sources (eg. local RPC) don't have an associated channel, never remove their queue + return true; } - // Some sources (eg. local RPC) don't have an associated channel, never remove their queue - return true; } - auto operator<=> (origin const &) const = default; + // TODO: Store channel as shared_ptr to avoid this mess + auto operator<=> (origin_entry const & other) const + { + // First compare source + if (auto cmp = source <=> other.source; cmp != 0) + return cmp; + + if (maybe_channel && other.maybe_channel) + { + // Then compare channels by ownership, not by the channel's value or state + std::owner_less> less; + if (less (*maybe_channel, *other.maybe_channel)) + return std::strong_ordering::less; + if (less (*other.maybe_channel, *maybe_channel)) + return std::strong_ordering::greater; + + return std::strong_ordering::equivalent; + } + else + { + if (maybe_channel && !other.maybe_channel) + { + return std::strong_ordering::greater; + } + if (!maybe_channel && other.maybe_channel) + { + return std::strong_ordering::less; + } + return std::strong_ordering::equivalent; + } + } + + operator origin () const + { + return { source, maybe_channel ? maybe_channel->lock () : nullptr }; + } }; -private: struct entry { using queue_t = std::deque; @@ -271,8 +339,8 @@ private: } private: - std::map queues; - std::map::iterator iterator{ queues.end () }; + std::map queues; + std::map::iterator iterator{ queues.end () }; size_t counter{ 0 }; std::chrono::steady_clock::time_point last_update{};