Fix TCP message manager not notifying producers (#2831)

* Add hanging test under current implementation of tcp_message_manager

* Solve hang by notifying other threads on consuming an item from the queue

* Adjust test with mass producer consumer

* Switch to condition.notify_one() on producer/consumer

* Avoid instantiating memory twice in the most common situation

* Incorrect condition notifications (Colin review)

* Reduce chance of new test failing

Co-authored-by: Wesley Shillingford <wezrule@hotmail.com>
This commit is contained in:
Guilherme Lawless 2020-07-08 08:51:22 +01:00 committed by GitHub
commit 3af6dd6f26
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 78 additions and 9 deletions

View file

@ -5,6 +5,7 @@
#include <gtest/gtest.h>
#include <boost/iostreams/stream_buffer.hpp>
#include <boost/range/join.hpp>
#include <boost/thread.hpp>
using namespace std::chrono_literals;
@ -1014,3 +1015,64 @@ TEST (network, tcp_no_connect_excluded_peers)
node1->network.merge_peer (node0->network.endpoint ());
ASSERT_TIMELY (5s, node0->network.size () == 1);
}
namespace nano
{
TEST (network, tcp_message_manager)
{
nano::tcp_message_manager manager (1);
nano::tcp_message_item item;
item.node_id = nano::account (100);
ASSERT_EQ (0, manager.entries.size ());
manager.put_message (item);
ASSERT_EQ (1, manager.entries.size ());
ASSERT_EQ (manager.get_message ().node_id, item.node_id);
ASSERT_EQ (0, manager.entries.size ());
// Fill the queue
manager.entries = decltype (manager.entries) (manager.max_entries, item);
ASSERT_EQ (manager.entries.size (), manager.max_entries);
// This task will wait until a message is consumed
auto future = std::async (std::launch::async, [&] {
manager.put_message (item);
});
// This should give sufficient time to execute put_message
// and prove that it waits on condition variable
std::this_thread::sleep_for (CI ? 200ms : 100ms);
ASSERT_EQ (manager.entries.size (), manager.max_entries);
ASSERT_EQ (manager.get_message ().node_id, item.node_id);
ASSERT_NE (std::future_status::timeout, future.wait_for (1s));
ASSERT_EQ (manager.entries.size (), manager.max_entries);
nano::tcp_message_manager manager2 (2);
size_t message_count = 10'000;
std::vector<std::thread> consumers;
for (auto i = 0; i < 4; ++i)
{
consumers.emplace_back ([&] {
for (auto i = 0; i < message_count; ++i)
{
ASSERT_EQ (manager.get_message ().node_id, item.node_id);
}
});
}
std::vector<std::thread> producers;
for (auto i = 0; i < 4; ++i)
{
producers.emplace_back ([&] {
for (auto i = 0; i < message_count; ++i)
{
manager.put_message (item);
}
});
}
for (auto & t : boost::range::join (producers, consumers))
{
t.join ();
}
}
}

View file

@ -891,32 +891,35 @@ void nano::tcp_message_manager::put_message (nano::tcp_message_item const & item
{
{
nano::unique_lock<std::mutex> lock (mutex);
while (entries.size () > max_entries && !stopped)
while (entries.size () >= max_entries && !stopped)
{
condition.wait (lock);
producer_condition.wait (lock);
}
entries.push_back (item_a);
}
condition.notify_all ();
consumer_condition.notify_one ();
}
nano::tcp_message_item nano::tcp_message_manager::get_message ()
{
nano::tcp_message_item result;
nano::unique_lock<std::mutex> lock (mutex);
while (entries.empty () && !stopped)
{
condition.wait (lock);
consumer_condition.wait (lock);
}
if (!entries.empty ())
{
auto result (entries.front ());
result = std::move (entries.front ());
entries.pop_front ();
return result;
}
else
{
return nano::tcp_message_item{ std::make_shared<nano::keepalive> (), nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0), 0, nullptr, nano::bootstrap_server_type::undefined };
result = nano::tcp_message_item{ std::make_shared<nano::keepalive> (), nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0), 0, nullptr, nano::bootstrap_server_type::undefined };
}
lock.unlock ();
producer_condition.notify_one ();
return result;
}
void nano::tcp_message_manager::stop ()
@ -925,7 +928,8 @@ void nano::tcp_message_manager::stop ()
nano::lock_guard<std::mutex> lock (mutex);
stopped = true;
}
condition.notify_all ();
consumer_condition.notify_all ();
producer_condition.notify_all ();
}
nano::syn_cookies::syn_cookies (size_t max_cookies_per_ip_a) :

View file

@ -77,11 +77,14 @@ public:
private:
std::mutex mutex;
nano::condition_variable condition;
nano::condition_variable producer_condition;
nano::condition_variable consumer_condition;
std::deque<nano::tcp_message_item> entries;
unsigned max_entries;
static unsigned const max_entries_per_connection = 16;
bool stopped{ false };
friend class network_tcp_message_manager_Test;
};
/**
* Node ID cookies for node ID handshakes