From 3af6dd6f26414ca9c24ef04f330ba6c4560259ff Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Wed, 8 Jul 2020 08:51:22 +0100 Subject: [PATCH] Fix TCP message manager not notifying producers (#2831) * Add hanging test under current implementation of tcp_message_manager * Solve hang by notifying other threads on consuming an item from the queue * Adjust test with mass producer consumer * Switch to condition.notify_one() on producer/consumer * Avoid instantiating memory twice in the most common situation * Incorrect condition notifications (Colin review) * Reduce chance of new test failing Co-authored-by: Wesley Shillingford --- nano/core_test/network.cpp | 62 ++++++++++++++++++++++++++++++++++++++ nano/node/network.cpp | 20 +++++++----- nano/node/network.hpp | 5 ++- 3 files changed, 78 insertions(+), 9 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 3b4f97f5..96f2684e 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -5,6 +5,7 @@ #include #include +#include #include using namespace std::chrono_literals; @@ -1014,3 +1015,64 @@ TEST (network, tcp_no_connect_excluded_peers) node1->network.merge_peer (node0->network.endpoint ()); ASSERT_TIMELY (5s, node0->network.size () == 1); } + +namespace nano +{ +TEST (network, tcp_message_manager) +{ + nano::tcp_message_manager manager (1); + nano::tcp_message_item item; + item.node_id = nano::account (100); + ASSERT_EQ (0, manager.entries.size ()); + manager.put_message (item); + ASSERT_EQ (1, manager.entries.size ()); + ASSERT_EQ (manager.get_message ().node_id, item.node_id); + ASSERT_EQ (0, manager.entries.size ()); + + // Fill the queue + manager.entries = decltype (manager.entries) (manager.max_entries, item); + ASSERT_EQ (manager.entries.size (), manager.max_entries); + + // This task will wait until a message is consumed + auto future = std::async (std::launch::async, [&] { + manager.put_message (item); + }); + + // This should give sufficient time to execute put_message + // and prove that it waits on condition variable + std::this_thread::sleep_for (CI ? 200ms : 100ms); + + ASSERT_EQ (manager.entries.size (), manager.max_entries); + ASSERT_EQ (manager.get_message ().node_id, item.node_id); + ASSERT_NE (std::future_status::timeout, future.wait_for (1s)); + ASSERT_EQ (manager.entries.size (), manager.max_entries); + + nano::tcp_message_manager manager2 (2); + size_t message_count = 10'000; + std::vector consumers; + for (auto i = 0; i < 4; ++i) + { + consumers.emplace_back ([&] { + for (auto i = 0; i < message_count; ++i) + { + ASSERT_EQ (manager.get_message ().node_id, item.node_id); + } + }); + } + std::vector producers; + for (auto i = 0; i < 4; ++i) + { + producers.emplace_back ([&] { + for (auto i = 0; i < message_count; ++i) + { + manager.put_message (item); + } + }); + } + + for (auto & t : boost::range::join (producers, consumers)) + { + t.join (); + } +} +} diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 58a57189..9fa13d44 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -891,32 +891,35 @@ void nano::tcp_message_manager::put_message (nano::tcp_message_item const & item { { nano::unique_lock lock (mutex); - while (entries.size () > max_entries && !stopped) + while (entries.size () >= max_entries && !stopped) { - condition.wait (lock); + producer_condition.wait (lock); } entries.push_back (item_a); } - condition.notify_all (); + consumer_condition.notify_one (); } nano::tcp_message_item nano::tcp_message_manager::get_message () { + nano::tcp_message_item result; nano::unique_lock lock (mutex); while (entries.empty () && !stopped) { - condition.wait (lock); + consumer_condition.wait (lock); } if (!entries.empty ()) { - auto result (entries.front ()); + result = std::move (entries.front ()); entries.pop_front (); - return result; } else { - return nano::tcp_message_item{ std::make_shared (), nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0), 0, nullptr, nano::bootstrap_server_type::undefined }; + result = nano::tcp_message_item{ std::make_shared (), nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0), 0, nullptr, nano::bootstrap_server_type::undefined }; } + lock.unlock (); + producer_condition.notify_one (); + return result; } void nano::tcp_message_manager::stop () @@ -925,7 +928,8 @@ void nano::tcp_message_manager::stop () nano::lock_guard lock (mutex); stopped = true; } - condition.notify_all (); + consumer_condition.notify_all (); + producer_condition.notify_all (); } nano::syn_cookies::syn_cookies (size_t max_cookies_per_ip_a) : diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 780da546..666cf4d4 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -77,11 +77,14 @@ public: private: std::mutex mutex; - nano::condition_variable condition; + nano::condition_variable producer_condition; + nano::condition_variable consumer_condition; std::deque entries; unsigned max_entries; static unsigned const max_entries_per_connection = 16; bool stopped{ false }; + + friend class network_tcp_message_manager_Test; }; /** * Node ID cookies for node ID handshakes