diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 99f5ca6d..6ce5670f 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -700,154 +700,6 @@ TEST (node, port_mapping) } } -TEST (message_buffer_manager, one_buffer) -{ - nano::stats stats; - nano::message_buffer_manager buffer (stats, 512, 1); - auto buffer1 (buffer.allocate ()); - ASSERT_NE (nullptr, buffer1); - buffer.enqueue (buffer1); - auto buffer2 (buffer.dequeue ()); - ASSERT_EQ (buffer1, buffer2); - buffer.release (buffer2); - auto buffer3 (buffer.allocate ()); - ASSERT_EQ (buffer1, buffer3); -} - -TEST (message_buffer_manager, two_buffers) -{ - nano::stats stats; - nano::message_buffer_manager buffer (stats, 512, 2); - auto buffer1 (buffer.allocate ()); - ASSERT_NE (nullptr, buffer1); - auto buffer2 (buffer.allocate ()); - ASSERT_NE (nullptr, buffer2); - ASSERT_NE (buffer1, buffer2); - buffer.enqueue (buffer2); - buffer.enqueue (buffer1); - auto buffer3 (buffer.dequeue ()); - ASSERT_EQ (buffer2, buffer3); - auto buffer4 (buffer.dequeue ()); - ASSERT_EQ (buffer1, buffer4); - buffer.release (buffer3); - buffer.release (buffer4); - auto buffer5 (buffer.allocate ()); - ASSERT_EQ (buffer2, buffer5); - auto buffer6 (buffer.allocate ()); - ASSERT_EQ (buffer1, buffer6); -} - -TEST (message_buffer_manager, one_overflow) -{ - nano::stats stats; - nano::message_buffer_manager buffer (stats, 512, 1); - auto buffer1 (buffer.allocate ()); - ASSERT_NE (nullptr, buffer1); - buffer.enqueue (buffer1); - auto buffer2 (buffer.allocate ()); - ASSERT_EQ (buffer1, buffer2); -} - -TEST (message_buffer_manager, two_overflow) -{ - nano::stats stats; - nano::message_buffer_manager buffer (stats, 512, 2); - auto buffer1 (buffer.allocate ()); - ASSERT_NE (nullptr, buffer1); - buffer.enqueue (buffer1); - auto buffer2 (buffer.allocate ()); - ASSERT_NE (nullptr, buffer2); - ASSERT_NE (buffer1, buffer2); - buffer.enqueue (buffer2); - auto buffer3 (buffer.allocate ()); - ASSERT_EQ (buffer1, buffer3); - auto buffer4 (buffer.allocate ()); - ASSERT_EQ (buffer2, buffer4); -} - -TEST (message_buffer_manager, one_buffer_multithreaded) -{ - nano::stats stats; - nano::message_buffer_manager buffer (stats, 512, 1); - boost::thread thread ([&buffer] () { - auto done (false); - while (!done) - { - auto item (buffer.dequeue ()); - done = item == nullptr; - if (item != nullptr) - { - buffer.release (item); - } - } - }); - auto buffer1 (buffer.allocate ()); - ASSERT_NE (nullptr, buffer1); - buffer.enqueue (buffer1); - auto buffer2 (buffer.allocate ()); - ASSERT_EQ (buffer1, buffer2); - buffer.stop (); - thread.join (); -} - -TEST (message_buffer_manager, many_buffers_multithreaded) -{ - nano::stats stats; - nano::message_buffer_manager buffer (stats, 512, 16); - std::vector threads; - for (auto i (0); i < 4; ++i) - { - threads.push_back (boost::thread ([&buffer] () { - auto done (false); - while (!done) - { - auto item (buffer.dequeue ()); - done = item == nullptr; - if (item != nullptr) - { - buffer.release (item); - } - } - })); - } - std::atomic_int count (0); - for (auto i (0); i < 4; ++i) - { - threads.push_back (boost::thread ([&buffer, &count] () { - auto done (false); - for (auto i (0); !done && i < 1000; ++i) - { - auto item (buffer.allocate ()); - done = item == nullptr; - if (item != nullptr) - { - buffer.enqueue (item); - ++count; - if (count > 3000) - { - buffer.stop (); - } - } - } - })); - } - buffer.stop (); - for (auto & i : threads) - { - i.join (); - } -} - -TEST (message_buffer_manager, stats) -{ - nano::stats stats; - nano::message_buffer_manager buffer (stats, 512, 1); - auto buffer1 (buffer.allocate ()); - buffer.enqueue (buffer1); - buffer.allocate (); - ASSERT_EQ (1, stats.count (nano::stat::type::udp, nano::stat::detail::overflow)); -} - TEST (tcp_listener, tcp_node_id_handshake) { nano::test::system system (1); diff --git a/nano/node/network.cpp b/nano/node/network.cpp index c4eb6816..63940171 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include @@ -19,7 +18,6 @@ nano::network::network (nano::node & node_a, uint16_t port_a) : debug_assert (message.header.version_using >= node.network_params.network.protocol_version_min); process_message (message, channel); } }, - buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer resolver (node_a.io_ctx), tcp_message_manager (node_a.config.tcp_incoming_connections_max), node (node_a), @@ -92,7 +90,6 @@ void nano::network::stop () { tcp_channels.stop (); resolver.cancel (); - buffer_container.stop (); tcp_message_manager.stop (); port = 0; for (auto & thread : packet_processing_threads) @@ -751,98 +748,6 @@ void nano::network::exclude (std::shared_ptr const & c erase (*channel); } -/* - * message_buffer_manager - */ - -nano::message_buffer_manager::message_buffer_manager (nano::stats & stats_a, std::size_t size, std::size_t count) : - stats (stats_a), - free (count), - full (count), - slab (size * count), - entries (count), - stopped (false) -{ - debug_assert (count > 0); - debug_assert (size > 0); - auto slab_data (slab.data ()); - auto entry_data (entries.data ()); - for (auto i (0); i < count; ++i, ++entry_data) - { - *entry_data = { slab_data + i * size, 0, nano::endpoint () }; - free.push_back (entry_data); - } -} - -nano::message_buffer * nano::message_buffer_manager::allocate () -{ - nano::unique_lock lock{ mutex }; - if (!stopped && free.empty () && full.empty ()) - { - stats.inc (nano::stat::type::udp, nano::stat::detail::blocking, nano::stat::dir::in); - condition.wait (lock, [&stopped = stopped, &free = free, &full = full] { return stopped || !free.empty () || !full.empty (); }); - } - nano::message_buffer * result (nullptr); - if (!free.empty ()) - { - result = free.front (); - free.pop_front (); - } - if (result == nullptr && !full.empty ()) - { - result = full.front (); - full.pop_front (); - stats.inc (nano::stat::type::udp, nano::stat::detail::overflow, nano::stat::dir::in); - } - release_assert (result || stopped); - return result; -} - -void nano::message_buffer_manager::enqueue (nano::message_buffer * data_a) -{ - debug_assert (data_a != nullptr); - { - nano::lock_guard lock{ mutex }; - full.push_back (data_a); - } - condition.notify_all (); -} - -nano::message_buffer * nano::message_buffer_manager::dequeue () -{ - nano::unique_lock lock{ mutex }; - while (!stopped && full.empty ()) - { - condition.wait (lock); - } - nano::message_buffer * result (nullptr); - if (!full.empty ()) - { - result = full.front (); - full.pop_front (); - } - return result; -} - -void nano::message_buffer_manager::release (nano::message_buffer * data_a) -{ - debug_assert (data_a != nullptr); - { - nano::lock_guard lock{ mutex }; - free.push_back (data_a); - } - condition.notify_all (); -} - -void nano::message_buffer_manager::stop () -{ - { - nano::lock_guard lock{ mutex }; - stopped = true; - } - condition.notify_all (); -} - nano::tcp_message_manager::tcp_message_manager (unsigned incoming_connections_max_a) : max_entries (incoming_connections_max_a * nano::tcp_message_manager::max_entries_per_connection + 1) { diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 8bbece2f..dd5051a4 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -7,65 +7,13 @@ #include +#include #include -#include #include namespace nano { -class channel; class node; -class stats; -class transaction; -class message_buffer final -{ -public: - uint8_t * buffer{ nullptr }; - std::size_t size{ 0 }; - nano::endpoint endpoint; -}; -/** - * A circular buffer for servicing nano realtime messages. - * This container follows a producer/consumer model where the operating system is producing data in to - * buffers which are serviced by internal threads. - * If buffers are not serviced fast enough they're internally dropped. - * This container has a maximum space to hold N buffers of M size and will allocate them in round-robin order. - * All public methods are thread-safe -*/ -class message_buffer_manager final -{ -public: - // Stats - Statistics - // Size - Size of each individual buffer - // Count - Number of buffers to allocate - message_buffer_manager (nano::stats & stats, std::size_t, std::size_t); - // Return a buffer where message data can be put - // Method will attempt to return the first free buffer - // If there are no free buffers, an unserviced buffer will be dequeued and returned - // Function will block if there are no free or unserviced buffers - // Return nullptr if the container has stopped - nano::message_buffer * allocate (); - // Queue a buffer that has been filled with message data and notify servicing threads - void enqueue (nano::message_buffer *); - // Return a buffer that has been filled with message data - // Function will block until a buffer has been added - // Return nullptr if the container has stopped - nano::message_buffer * dequeue (); - // Return a buffer to the freelist after is has been serviced - void release (nano::message_buffer *); - // Stop container and notify waiting threads - void stop (); - -private: - nano::stats & stats; - nano::mutex mutex; - nano::condition_variable condition; - boost::circular_buffer free; - boost::circular_buffer full; - std::vector slab; - std::vector entries; - bool stopped; -}; class tcp_message_manager final { public: @@ -181,7 +129,6 @@ private: public: std::function const &)> inbound; - nano::message_buffer_manager buffer_container; boost::asio::ip::udp::resolver resolver; std::vector packet_processing_threads; nano::peer_exclusion excluded_peers;