diff --git a/rai/core_test/network.cpp b/rai/core_test/network.cpp index 0ff48ace..04af572e 100644 --- a/rai/core_test/network.cpp +++ b/rai/core_test/network.cpp @@ -55,9 +55,10 @@ TEST (network, construction) TEST (network, self_discard) { rai::system system (24000, 1); - system.nodes[0]->network.remote = system.nodes[0]->network.endpoint (); + rai::udp_data data; + data.endpoint = system.nodes[0]->network.endpoint (); ASSERT_EQ (0, system.nodes[0]->stats.count (rai::stat::type::error, rai::stat::detail::bad_sender)); - system.nodes[0]->network.receive_action (boost::system::error_code{}, 0); + system.nodes[0]->network.receive_action (&data); ASSERT_EQ (1, system.nodes[0]->stats.count (rai::stat::type::error, rai::stat::detail::bad_sender)); } @@ -930,3 +931,151 @@ TEST (node, port_mapping) system.poll (); } } + +TEST (udp_buffer, one_buffer) +{ + rai::stat stats; + rai::udp_buffer buffer (stats, 512, 1); + auto buffer1 (buffer.allocate ()); + ASSERT_NE (nullptr, buffer1); + buffer.enqueue (buffer1); + auto buffer2 (buffer.dequeue ()); + ASSERT_EQ (buffer1, buffer2); + buffer.release (buffer2); + auto buffer3 (buffer.allocate ()); + ASSERT_EQ (buffer1, buffer3); +} + +TEST (udp_buffer, two_buffers) +{ + rai::stat stats; + rai::udp_buffer buffer (stats, 512, 2); + auto buffer1 (buffer.allocate ()); + ASSERT_NE (nullptr, buffer1); + auto buffer2 (buffer.allocate ()); + ASSERT_NE (nullptr, buffer2); + ASSERT_NE (buffer1, buffer2); + buffer.enqueue (buffer2); + buffer.enqueue (buffer1); + auto buffer3 (buffer.dequeue ()); + ASSERT_EQ (buffer2, buffer3); + auto buffer4 (buffer.dequeue ()); + ASSERT_EQ (buffer1, buffer4); + buffer.release (buffer3); + buffer.release (buffer4); + auto buffer5 (buffer.allocate ()); + ASSERT_EQ (buffer2, buffer5); + auto buffer6 (buffer.allocate ()); + ASSERT_EQ (buffer1, buffer6); +} + +TEST (udp_buffer, one_overflow) +{ + rai::stat stats; + rai::udp_buffer buffer (stats, 512, 1); + auto buffer1 (buffer.allocate ()); + ASSERT_NE (nullptr, buffer1); + buffer.enqueue (buffer1); + auto buffer2 (buffer.allocate ()); + ASSERT_EQ (buffer1, buffer2); +} + +TEST (udp_buffer, two_overflow) +{ + rai::stat stats; + rai::udp_buffer buffer (stats, 512, 2); + auto buffer1 (buffer.allocate ()); + ASSERT_NE (nullptr, buffer1); + buffer.enqueue (buffer1); + auto buffer2 (buffer.allocate ()); + ASSERT_NE (nullptr, buffer2); + ASSERT_NE (buffer1, buffer2); + buffer.enqueue (buffer2); + auto buffer3 (buffer.allocate ()); + ASSERT_EQ (buffer1, buffer3); + auto buffer4 (buffer.allocate ()); + ASSERT_EQ (buffer2, buffer4); +} + +TEST (udp_buffer, one_buffer_multithreaded) +{ + rai::stat stats; + rai::udp_buffer buffer (stats, 512, 1); + std::thread thread ([&buffer]() { + auto done (false); + while (!done) + { + auto item (buffer.dequeue ()); + done = item == nullptr; + if (item != nullptr) + { + buffer.release (item); + } + } + }); + auto buffer1 (buffer.allocate ()); + ASSERT_NE (nullptr, buffer1); + buffer.enqueue (buffer1); + auto buffer2 (buffer.allocate ()); + ASSERT_EQ (buffer1, buffer2); + buffer.stop (); + thread.join (); +} + +TEST (udp_buffer, many_buffers_multithreaded) +{ + rai::stat stats; + rai::udp_buffer buffer (stats, 512, 16); + std::vector threads; + for (auto i (0); i < 4; ++i) + { + threads.push_back (std::thread ([&buffer]() { + auto done (false); + while (!done) + { + auto item (buffer.dequeue ()); + done = item == nullptr; + if (item != nullptr) + { + buffer.release (item); + } + } + })); + } + std::atomic_int count (0); + for (auto i (0); i < 4; ++i) + { + threads.push_back (std::thread ([&buffer, &count]() { + auto done (false); + for (auto i (0); !done && i < 1000; ++i) + { + auto item (buffer.allocate ()); + done = item == nullptr; + if (item != nullptr) + { + buffer.enqueue (item); + ++count; + if (count > 3000) + { + buffer.stop (); + } + } + } + })); + } + buffer.stop (); + for (auto & i : threads) + { + i.join (); + } +} + +TEST (udp_buffer, stats) +{ + rai::stat stats; + rai::udp_buffer buffer (stats, 512, 1); + auto buffer1 (buffer.allocate ()); + buffer.enqueue (buffer1); + buffer.allocate (); + ASSERT_EQ (1, stats.count (rai::stat::type::udp, rai::stat::detail::overflow)); +} diff --git a/rai/node/node.cpp b/rai/node/node.cpp index 590e688b..7dbe7760 100644 --- a/rai/node/node.cpp +++ b/rai/node/node.cpp @@ -40,11 +40,45 @@ rai::endpoint rai::map_endpoint_to_v6 (rai::endpoint const & endpoint_a) } rai::network::network (rai::node & node_a, uint16_t port) : +buffer_container (node_a.stats, rai::network::buffer_size, 4096), // 2Mb receive buffer socket (node_a.service, rai::endpoint (boost::asio::ip::address_v6::any (), port)), resolver (node_a.service), node (node_a), on (true) { + for (size_t i = 0; i < node.config.io_threads; ++i) + { + packet_processing_threads.push_back (std::thread ([this]() { + try + { + process_packets (); + } + catch (...) + { + release_assert (false); + } + if (this->node.config.logging.network_packet_logging ()) + { + BOOST_LOG (this->node.log) << "Exiting packet processing thread"; + } + })); + } +} + +rai::network::~network () +{ + for (auto & thread : packet_processing_threads) + { + thread.join (); + } +} + +void rai::network::start () +{ + for (size_t i = 0; i < node.config.io_threads; ++i) + { + receive (); + } } void rai::network::receive () @@ -54,16 +88,53 @@ void rai::network::receive () BOOST_LOG (node.log) << "Receiving packet"; } std::unique_lock lock (socket_mutex); - socket.async_receive_from (boost::asio::buffer (buffer.data (), buffer.size ()), remote, [this](boost::system::error_code const & error, size_t size_a) { - receive_action (error, size_a); + auto data (buffer_container.allocate ()); + socket.async_receive_from (boost::asio::buffer (data->buffer, rai::network::buffer_size), data->endpoint, [this, data](boost::system::error_code const & error, size_t size_a) { + if (!error && this->on) + { + data->size = size_a; + this->buffer_container.enqueue (data); + this->receive (); + } + else + { + this->buffer_container.release (data); + if (error) + { + if (this->node.config.logging.network_logging ()) + { + BOOST_LOG (this->node.log) << boost::str (boost::format ("UDP Receive error: %1%") % error.message ()); + } + } + if (this->on) + { + this->node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() { this->receive (); }); + } + } }); } +void rai::network::process_packets () +{ + while (on) + { + auto data (buffer_container.dequeue ()); + if (data == nullptr) + { + break; + } + //std::cerr << data->endpoint.address ().to_string (); + receive_action (data); + buffer_container.release (data); + } +} + void rai::network::stop () { on = false; socket.close (); resolver.cancel (); + buffer_container.stop (); } void rai::network::send_keepalive (rai::endpoint const & endpoint_a) @@ -486,112 +557,94 @@ public: }; } -void rai::network::receive_action (boost::system::error_code const & error, size_t size_a) +void rai::network::receive_action (rai::udp_data * data_a) { - if (!error && on) + if (!rai::reserved_address (data_a->endpoint, false) && data_a->endpoint != endpoint ()) { - if (!rai::reserved_address (remote, false) && remote != endpoint ()) + network_message_visitor visitor (node, data_a->endpoint); + rai::message_parser parser (visitor, node.work); + parser.deserialize_buffer (data_a->buffer, data_a->size); + if (parser.status != rai::message_parser::parse_status::success) { - network_message_visitor visitor (node, remote); - rai::message_parser parser (visitor, node.work); - parser.deserialize_buffer (buffer.data (), size_a); - if (parser.status != rai::message_parser::parse_status::success) + node.stats.inc (rai::stat::type::error); + + if (parser.status == rai::message_parser::parse_status::insufficient_work) { - node.stats.inc (rai::stat::type::error); + if (node.config.logging.insufficient_work_logging ()) + { + BOOST_LOG (node.log) << "Insufficient work in message"; + } - if (parser.status == rai::message_parser::parse_status::insufficient_work) + // We've already increment error count, update detail only + node.stats.inc_detail_only (rai::stat::type::error, rai::stat::detail::insufficient_work); + } + else if (parser.status == rai::message_parser::parse_status::invalid_message_type) + { + if (node.config.logging.network_logging ()) { - if (node.config.logging.insufficient_work_logging ()) - { - BOOST_LOG (node.log) << "Insufficient work in message"; - } - - // We've already increment error count, update detail only - node.stats.inc_detail_only (rai::stat::type::error, rai::stat::detail::insufficient_work); + BOOST_LOG (node.log) << "Invalid message type in message"; } - else if (parser.status == rai::message_parser::parse_status::invalid_message_type) + } + else if (parser.status == rai::message_parser::parse_status::invalid_header) + { + if (node.config.logging.network_logging ()) { - if (node.config.logging.network_logging ()) - { - BOOST_LOG (node.log) << "Invalid message type in message"; - } + BOOST_LOG (node.log) << "Invalid header in message"; } - else if (parser.status == rai::message_parser::parse_status::invalid_header) + } + else if (parser.status == rai::message_parser::parse_status::invalid_keepalive_message) + { + if (node.config.logging.network_logging ()) { - if (node.config.logging.network_logging ()) - { - BOOST_LOG (node.log) << "Invalid header in message"; - } + BOOST_LOG (node.log) << "Invalid keepalive message"; } - else if (parser.status == rai::message_parser::parse_status::invalid_keepalive_message) + } + else if (parser.status == rai::message_parser::parse_status::invalid_publish_message) + { + if (node.config.logging.network_logging ()) { - if (node.config.logging.network_logging ()) - { - BOOST_LOG (node.log) << "Invalid keepalive message"; - } + BOOST_LOG (node.log) << "Invalid publish message"; } - else if (parser.status == rai::message_parser::parse_status::invalid_publish_message) + } + else if (parser.status == rai::message_parser::parse_status::invalid_confirm_req_message) + { + if (node.config.logging.network_logging ()) { - if (node.config.logging.network_logging ()) - { - BOOST_LOG (node.log) << "Invalid publish message"; - } + BOOST_LOG (node.log) << "Invalid confirm_req message"; } - else if (parser.status == rai::message_parser::parse_status::invalid_confirm_req_message) + } + else if (parser.status == rai::message_parser::parse_status::invalid_confirm_ack_message) + { + if (node.config.logging.network_logging ()) { - if (node.config.logging.network_logging ()) - { - BOOST_LOG (node.log) << "Invalid confirm_req message"; - } + BOOST_LOG (node.log) << "Invalid confirm_ack message"; } - else if (parser.status == rai::message_parser::parse_status::invalid_confirm_ack_message) + } + else if (parser.status == rai::message_parser::parse_status::invalid_node_id_handshake_message) + { + if (node.config.logging.network_logging ()) { - if (node.config.logging.network_logging ()) - { - BOOST_LOG (node.log) << "Invalid confirm_ack message"; - } - } - else if (parser.status == rai::message_parser::parse_status::invalid_node_id_handshake_message) - { - if (node.config.logging.network_logging ()) - { - BOOST_LOG (node.log) << "Invalid node_id_handshake message"; - } - } - else - { - BOOST_LOG (node.log) << "Could not deserialize buffer"; + BOOST_LOG (node.log) << "Invalid node_id_handshake message"; } } else { - node.stats.add (rai::stat::type::traffic, rai::stat::dir::in, size_a); + BOOST_LOG (node.log) << "Could not deserialize buffer"; } } else { - if (node.config.logging.network_logging ()) - { - BOOST_LOG (node.log) << boost::str (boost::format ("Reserved sender %1%") % remote.address ().to_string ()); - } - - node.stats.inc_detail_only (rai::stat::type::error, rai::stat::detail::bad_sender); + node.stats.add (rai::stat::type::traffic, rai::stat::dir::in, data_a->size); } - receive (); } else { - if (error) + if (node.config.logging.network_logging ()) { - if (node.config.logging.network_logging ()) - { - BOOST_LOG (node.log) << boost::str (boost::format ("UDP Receive error: %1%") % error.message ()); - } - } - if (on) - { - node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() { receive (); }); + BOOST_LOG (node.log) << boost::str (boost::format ("Reserved sender %1%") % data_a->endpoint.address ().to_string ()); } + + node.stats.inc_detail_only (rai::stat::type::error, rai::stat::detail::bad_sender); } } @@ -2269,7 +2322,7 @@ bool rai::parse_tcp_endpoint (std::string const & string, rai::tcp_endpoint & en void rai::node::start () { - network.receive (); + network.start (); ongoing_keepalive (); ongoing_syn_cookie_cleanup (); ongoing_bootstrap (); @@ -4299,3 +4352,79 @@ void rai::port_mapping::stop () freeUPNPDevlist (devices); devices = nullptr; } + +rai::udp_buffer::udp_buffer (rai::stat & stats, size_t size, size_t count) : +stats (stats), +free (count), +full (count), +slab (size * count), +entries (count), +stopped (false) +{ + assert (count > 0); + assert (size > 0); + auto slab_data (slab.data ()); + auto entry_data (entries.data ()); + for (auto i (0); i < count; ++i, ++entry_data) + { + *entry_data = { slab_data + i * size, 0, rai::endpoint () }; + free.push_back (entry_data); + } +} +rai::udp_data * rai::udp_buffer::allocate () +{ + std::unique_lock lock (mutex); + while (!stopped && free.empty () && full.empty ()) + { + stats.inc (rai::stat::type::udp, rai::stat::detail::blocking, rai::stat::dir::in); + condition.wait (lock); + } + rai::udp_data * result (nullptr); + if (!free.empty ()) + { + result = free.front (); + free.pop_front (); + } + if (result == nullptr) + { + result = full.front (); + full.pop_front (); + stats.inc (rai::stat::type::udp, rai::stat::detail::overflow, rai::stat::dir::in); + } + return result; +} +void rai::udp_buffer::enqueue (rai::udp_data * data_a) +{ + assert (data_a != nullptr); + std::lock_guard lock (mutex); + full.push_back (data_a); + condition.notify_one (); +} +rai::udp_data * rai::udp_buffer::dequeue () +{ + std::unique_lock lock (mutex); + while (!stopped && full.empty ()) + { + condition.wait (lock); + } + rai::udp_data * result (nullptr); + if (!full.empty ()) + { + result = full.front (); + full.pop_front (); + } + return result; +} +void rai::udp_buffer::release (rai::udp_data * data_a) +{ + assert (data_a != nullptr); + std::lock_guard lock (mutex); + free.push_back (data_a); + condition.notify_one (); +} +void rai::udp_buffer::stop () +{ + std::lock_guard lock (mutex); + stopped = true; + condition.notify_all (); +} diff --git a/rai/node/node.hpp b/rai/node/node.hpp index 8d96aa4a..b1e38d80 100644 --- a/rai/node/node.hpp +++ b/rai/node/node.hpp @@ -388,13 +388,63 @@ private: std::mutex mutex; rai::node & node; }; +class udp_data +{ +public: + uint8_t * buffer; + size_t size; + rai::endpoint endpoint; +}; +/** + * A circular buffer for servicing UDP datagrams. This container follows a producer/consumer model where the operating system is producing data in to buffers which are serviced by internal threads. + * If buffers are not serviced fast enough they're internally dropped. + * This container has a maximum space to hold N buffers of M size and will allocate them in round-robin order. + * All public methods are thread-safe +*/ +class udp_buffer +{ +public: + // Size - Size of each individual buffer + // Count - Number of buffers to allocate + // Stats - Statistics + udp_buffer (rai::stat & stats, size_t, size_t); + // Return a buffer where UDP data can be put + // Method will attempt to return the first free buffer + // If there are no free buffers, an unserviced buffer will be dequeued and returned + // Function will block if there are no free or unserviced buffers + // Return nullptr if the container has stopped + rai::udp_data * allocate (); + // Queue a buffer that has been filled with UDP data and notify servicing threads + void enqueue (rai::udp_data *); + // Return a buffer that has been filled with UDP data + // Function will block until a buffer has been added + // Return nullptr if the container has stopped + rai::udp_data * dequeue (); + // Return a buffer to the freelist after is has been serviced + void release (rai::udp_data *); + // Stop container and notify waiting threads + void stop (); + +private: + rai::stat & stats; + std::mutex mutex; + std::condition_variable condition; + boost::circular_buffer free; + boost::circular_buffer full; + std::vector slab; + std::vector entries; + bool stopped; +}; class network { public: network (rai::node &, uint16_t); + ~network (); void receive (); + void process_packets (); + void start (); void stop (); - void receive_action (boost::system::error_code const &, size_t); + void receive_action (rai::udp_data *); void rpc_action (boost::system::error_code const &, size_t); void republish_vote (std::shared_ptr); void republish_block (rai::transaction const &, std::shared_ptr, bool = true); @@ -409,14 +459,15 @@ public: void send_confirm_req (rai::endpoint const &, std::shared_ptr); void send_buffer (uint8_t const *, size_t, rai::endpoint const &, std::function); rai::endpoint endpoint (); - rai::endpoint remote; - std::array buffer; + rai::udp_buffer buffer_container; boost::asio::ip::udp::socket socket; std::mutex socket_mutex; boost::asio::ip::udp::resolver resolver; + std::vector packet_processing_threads; rai::node & node; bool on; static uint16_t const node_port = rai::rai_network == rai::rai_networks::rai_live_network ? 7075 : 54000; + static size_t const buffer_size = 512; }; class logging { diff --git a/rai/node/stats.cpp b/rai/node/stats.cpp index 971263f5..eef0cea9 100644 --- a/rai/node/stats.cpp +++ b/rai/node/stats.cpp @@ -330,6 +330,9 @@ std::string rai::stat::type_to_string (uint32_t key) case rai::stat::type::ledger: res = "ledger"; break; + case rai::stat::type::udp: + res = "udp"; + break; case rai::stat::type::peering: res = "peering"; break; @@ -430,6 +433,12 @@ std::string rai::stat::detail_to_string (uint32_t key) case rai::stat::detail::vote_invalid: res = "vote_invalid"; break; + case rai::stat::detail::blocking: + res = "blocking"; + break; + case rai::stat::detail::overflow: + res = "overflow"; + break; } return res; } diff --git a/rai/node/stats.hpp b/rai/node/stats.hpp index cdef377c..05e8804b 100644 --- a/rai/node/stats.hpp +++ b/rai/node/stats.hpp @@ -185,6 +185,7 @@ public: vote, http_callback, peering, + udp }; /** Optional detail type */ @@ -227,6 +228,10 @@ public: vote_replay, vote_invalid, + // udp + blocking, + overflow, + // peering handshake, };