Merge PR Remove message buffer class and its manager class (#4141)

Remove message buffer class and its manager class
This commit is contained in:
Thiago Silva 2023-02-18 09:25:04 -03:00 committed by GitHub
commit 2ccba41bae
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 1 additions and 297 deletions

View file

@ -700,154 +700,6 @@ TEST (node, port_mapping)
}
}
TEST (message_buffer_manager, one_buffer)
{
nano::stats stats;
nano::message_buffer_manager buffer (stats, 512, 1);
auto buffer1 (buffer.allocate ());
ASSERT_NE (nullptr, buffer1);
buffer.enqueue (buffer1);
auto buffer2 (buffer.dequeue ());
ASSERT_EQ (buffer1, buffer2);
buffer.release (buffer2);
auto buffer3 (buffer.allocate ());
ASSERT_EQ (buffer1, buffer3);
}
TEST (message_buffer_manager, two_buffers)
{
nano::stats stats;
nano::message_buffer_manager buffer (stats, 512, 2);
auto buffer1 (buffer.allocate ());
ASSERT_NE (nullptr, buffer1);
auto buffer2 (buffer.allocate ());
ASSERT_NE (nullptr, buffer2);
ASSERT_NE (buffer1, buffer2);
buffer.enqueue (buffer2);
buffer.enqueue (buffer1);
auto buffer3 (buffer.dequeue ());
ASSERT_EQ (buffer2, buffer3);
auto buffer4 (buffer.dequeue ());
ASSERT_EQ (buffer1, buffer4);
buffer.release (buffer3);
buffer.release (buffer4);
auto buffer5 (buffer.allocate ());
ASSERT_EQ (buffer2, buffer5);
auto buffer6 (buffer.allocate ());
ASSERT_EQ (buffer1, buffer6);
}
TEST (message_buffer_manager, one_overflow)
{
nano::stats stats;
nano::message_buffer_manager buffer (stats, 512, 1);
auto buffer1 (buffer.allocate ());
ASSERT_NE (nullptr, buffer1);
buffer.enqueue (buffer1);
auto buffer2 (buffer.allocate ());
ASSERT_EQ (buffer1, buffer2);
}
TEST (message_buffer_manager, two_overflow)
{
nano::stats stats;
nano::message_buffer_manager buffer (stats, 512, 2);
auto buffer1 (buffer.allocate ());
ASSERT_NE (nullptr, buffer1);
buffer.enqueue (buffer1);
auto buffer2 (buffer.allocate ());
ASSERT_NE (nullptr, buffer2);
ASSERT_NE (buffer1, buffer2);
buffer.enqueue (buffer2);
auto buffer3 (buffer.allocate ());
ASSERT_EQ (buffer1, buffer3);
auto buffer4 (buffer.allocate ());
ASSERT_EQ (buffer2, buffer4);
}
TEST (message_buffer_manager, one_buffer_multithreaded)
{
nano::stats stats;
nano::message_buffer_manager buffer (stats, 512, 1);
boost::thread thread ([&buffer] () {
auto done (false);
while (!done)
{
auto item (buffer.dequeue ());
done = item == nullptr;
if (item != nullptr)
{
buffer.release (item);
}
}
});
auto buffer1 (buffer.allocate ());
ASSERT_NE (nullptr, buffer1);
buffer.enqueue (buffer1);
auto buffer2 (buffer.allocate ());
ASSERT_EQ (buffer1, buffer2);
buffer.stop ();
thread.join ();
}
TEST (message_buffer_manager, many_buffers_multithreaded)
{
nano::stats stats;
nano::message_buffer_manager buffer (stats, 512, 16);
std::vector<boost::thread> threads;
for (auto i (0); i < 4; ++i)
{
threads.push_back (boost::thread ([&buffer] () {
auto done (false);
while (!done)
{
auto item (buffer.dequeue ());
done = item == nullptr;
if (item != nullptr)
{
buffer.release (item);
}
}
}));
}
std::atomic_int count (0);
for (auto i (0); i < 4; ++i)
{
threads.push_back (boost::thread ([&buffer, &count] () {
auto done (false);
for (auto i (0); !done && i < 1000; ++i)
{
auto item (buffer.allocate ());
done = item == nullptr;
if (item != nullptr)
{
buffer.enqueue (item);
++count;
if (count > 3000)
{
buffer.stop ();
}
}
}
}));
}
buffer.stop ();
for (auto & i : threads)
{
i.join ();
}
}
TEST (message_buffer_manager, stats)
{
nano::stats stats;
nano::message_buffer_manager buffer (stats, 512, 1);
auto buffer1 (buffer.allocate ());
buffer.enqueue (buffer1);
buffer.allocate ();
ASSERT_EQ (1, stats.count (nano::stat::type::udp, nano::stat::detail::overflow));
}
TEST (tcp_listener, tcp_node_id_handshake)
{
nano::test::system system (1);

View file

@ -3,7 +3,6 @@
#include <nano/node/network.hpp>
#include <nano/node/node.hpp>
#include <nano/node/telemetry.hpp>
#include <nano/secure/buffer.hpp>
#include <boost/format.hpp>
@ -19,7 +18,6 @@ nano::network::network (nano::node & node_a, uint16_t port_a) :
debug_assert (message.header.version_using >= node.network_params.network.protocol_version_min);
process_message (message, channel);
} },
buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer
resolver (node_a.io_ctx),
tcp_message_manager (node_a.config.tcp_incoming_connections_max),
node (node_a),
@ -92,7 +90,6 @@ void nano::network::stop ()
{
tcp_channels.stop ();
resolver.cancel ();
buffer_container.stop ();
tcp_message_manager.stop ();
port = 0;
for (auto & thread : packet_processing_threads)
@ -751,98 +748,6 @@ void nano::network::exclude (std::shared_ptr<nano::transport::channel> const & c
erase (*channel);
}
/*
* message_buffer_manager
*/
nano::message_buffer_manager::message_buffer_manager (nano::stats & stats_a, std::size_t size, std::size_t count) :
stats (stats_a),
free (count),
full (count),
slab (size * count),
entries (count),
stopped (false)
{
debug_assert (count > 0);
debug_assert (size > 0);
auto slab_data (slab.data ());
auto entry_data (entries.data ());
for (auto i (0); i < count; ++i, ++entry_data)
{
*entry_data = { slab_data + i * size, 0, nano::endpoint () };
free.push_back (entry_data);
}
}
nano::message_buffer * nano::message_buffer_manager::allocate ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
if (!stopped && free.empty () && full.empty ())
{
stats.inc (nano::stat::type::udp, nano::stat::detail::blocking, nano::stat::dir::in);
condition.wait (lock, [&stopped = stopped, &free = free, &full = full] { return stopped || !free.empty () || !full.empty (); });
}
nano::message_buffer * result (nullptr);
if (!free.empty ())
{
result = free.front ();
free.pop_front ();
}
if (result == nullptr && !full.empty ())
{
result = full.front ();
full.pop_front ();
stats.inc (nano::stat::type::udp, nano::stat::detail::overflow, nano::stat::dir::in);
}
release_assert (result || stopped);
return result;
}
void nano::message_buffer_manager::enqueue (nano::message_buffer * data_a)
{
debug_assert (data_a != nullptr);
{
nano::lock_guard<nano::mutex> lock{ mutex };
full.push_back (data_a);
}
condition.notify_all ();
}
nano::message_buffer * nano::message_buffer_manager::dequeue ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped && full.empty ())
{
condition.wait (lock);
}
nano::message_buffer * result (nullptr);
if (!full.empty ())
{
result = full.front ();
full.pop_front ();
}
return result;
}
void nano::message_buffer_manager::release (nano::message_buffer * data_a)
{
debug_assert (data_a != nullptr);
{
nano::lock_guard<nano::mutex> lock{ mutex };
free.push_back (data_a);
}
condition.notify_all ();
}
void nano::message_buffer_manager::stop ()
{
{
nano::lock_guard<nano::mutex> lock{ mutex };
stopped = true;
}
condition.notify_all ();
}
nano::tcp_message_manager::tcp_message_manager (unsigned incoming_connections_max_a) :
max_entries (incoming_connections_max_a * nano::tcp_message_manager::max_entries_per_connection + 1)
{

View file

@ -7,65 +7,13 @@
#include <boost/thread/thread.hpp>
#include <deque>
#include <memory>
#include <queue>
#include <unordered_set>
namespace nano
{
class channel;
class node;
class stats;
class transaction;
class message_buffer final
{
public:
uint8_t * buffer{ nullptr };
std::size_t size{ 0 };
nano::endpoint endpoint;
};
/**
* A circular buffer for servicing nano realtime messages.
* This container follows a producer/consumer model where the operating system is producing data in to
* buffers which are serviced by internal threads.
* If buffers are not serviced fast enough they're internally dropped.
* This container has a maximum space to hold N buffers of M size and will allocate them in round-robin order.
* All public methods are thread-safe
*/
class message_buffer_manager final
{
public:
// Stats - Statistics
// Size - Size of each individual buffer
// Count - Number of buffers to allocate
message_buffer_manager (nano::stats & stats, std::size_t, std::size_t);
// Return a buffer where message data can be put
// Method will attempt to return the first free buffer
// If there are no free buffers, an unserviced buffer will be dequeued and returned
// Function will block if there are no free or unserviced buffers
// Return nullptr if the container has stopped
nano::message_buffer * allocate ();
// Queue a buffer that has been filled with message data and notify servicing threads
void enqueue (nano::message_buffer *);
// Return a buffer that has been filled with message data
// Function will block until a buffer has been added
// Return nullptr if the container has stopped
nano::message_buffer * dequeue ();
// Return a buffer to the freelist after is has been serviced
void release (nano::message_buffer *);
// Stop container and notify waiting threads
void stop ();
private:
nano::stats & stats;
nano::mutex mutex;
nano::condition_variable condition;
boost::circular_buffer<nano::message_buffer *> free;
boost::circular_buffer<nano::message_buffer *> full;
std::vector<uint8_t> slab;
std::vector<nano::message_buffer> entries;
bool stopped;
};
class tcp_message_manager final
{
public:
@ -181,7 +129,6 @@ private:
public:
std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> inbound;
nano::message_buffer_manager buffer_container;
boost::asio::ip::udp::resolver resolver;
std::vector<boost::thread> packet_processing_threads;
nano::peer_exclusion excluded_peers;