Service UDP receive socket more frequently (#1210)

This commit is contained in:
clemahieu 2018-09-21 07:46:09 -07:00 committed by Roy Keene
commit 6bdc7aa877
5 changed files with 425 additions and 82 deletions

View file

@ -55,9 +55,10 @@ TEST (network, construction)
TEST (network, self_discard)
{
rai::system system (24000, 1);
system.nodes[0]->network.remote = system.nodes[0]->network.endpoint ();
rai::udp_data data;
data.endpoint = system.nodes[0]->network.endpoint ();
ASSERT_EQ (0, system.nodes[0]->stats.count (rai::stat::type::error, rai::stat::detail::bad_sender));
system.nodes[0]->network.receive_action (boost::system::error_code{}, 0);
system.nodes[0]->network.receive_action (&data);
ASSERT_EQ (1, system.nodes[0]->stats.count (rai::stat::type::error, rai::stat::detail::bad_sender));
}
@ -930,3 +931,151 @@ TEST (node, port_mapping)
system.poll ();
}
}
TEST (udp_buffer, one_buffer)
{
rai::stat stats;
rai::udp_buffer buffer (stats, 512, 1);
auto buffer1 (buffer.allocate ());
ASSERT_NE (nullptr, buffer1);
buffer.enqueue (buffer1);
auto buffer2 (buffer.dequeue ());
ASSERT_EQ (buffer1, buffer2);
buffer.release (buffer2);
auto buffer3 (buffer.allocate ());
ASSERT_EQ (buffer1, buffer3);
}
TEST (udp_buffer, two_buffers)
{
rai::stat stats;
rai::udp_buffer buffer (stats, 512, 2);
auto buffer1 (buffer.allocate ());
ASSERT_NE (nullptr, buffer1);
auto buffer2 (buffer.allocate ());
ASSERT_NE (nullptr, buffer2);
ASSERT_NE (buffer1, buffer2);
buffer.enqueue (buffer2);
buffer.enqueue (buffer1);
auto buffer3 (buffer.dequeue ());
ASSERT_EQ (buffer2, buffer3);
auto buffer4 (buffer.dequeue ());
ASSERT_EQ (buffer1, buffer4);
buffer.release (buffer3);
buffer.release (buffer4);
auto buffer5 (buffer.allocate ());
ASSERT_EQ (buffer2, buffer5);
auto buffer6 (buffer.allocate ());
ASSERT_EQ (buffer1, buffer6);
}
TEST (udp_buffer, one_overflow)
{
rai::stat stats;
rai::udp_buffer buffer (stats, 512, 1);
auto buffer1 (buffer.allocate ());
ASSERT_NE (nullptr, buffer1);
buffer.enqueue (buffer1);
auto buffer2 (buffer.allocate ());
ASSERT_EQ (buffer1, buffer2);
}
TEST (udp_buffer, two_overflow)
{
rai::stat stats;
rai::udp_buffer buffer (stats, 512, 2);
auto buffer1 (buffer.allocate ());
ASSERT_NE (nullptr, buffer1);
buffer.enqueue (buffer1);
auto buffer2 (buffer.allocate ());
ASSERT_NE (nullptr, buffer2);
ASSERT_NE (buffer1, buffer2);
buffer.enqueue (buffer2);
auto buffer3 (buffer.allocate ());
ASSERT_EQ (buffer1, buffer3);
auto buffer4 (buffer.allocate ());
ASSERT_EQ (buffer2, buffer4);
}
TEST (udp_buffer, one_buffer_multithreaded)
{
rai::stat stats;
rai::udp_buffer buffer (stats, 512, 1);
std::thread thread ([&buffer]() {
auto done (false);
while (!done)
{
auto item (buffer.dequeue ());
done = item == nullptr;
if (item != nullptr)
{
buffer.release (item);
}
}
});
auto buffer1 (buffer.allocate ());
ASSERT_NE (nullptr, buffer1);
buffer.enqueue (buffer1);
auto buffer2 (buffer.allocate ());
ASSERT_EQ (buffer1, buffer2);
buffer.stop ();
thread.join ();
}
TEST (udp_buffer, many_buffers_multithreaded)
{
rai::stat stats;
rai::udp_buffer buffer (stats, 512, 16);
std::vector<std::thread> threads;
for (auto i (0); i < 4; ++i)
{
threads.push_back (std::thread ([&buffer]() {
auto done (false);
while (!done)
{
auto item (buffer.dequeue ());
done = item == nullptr;
if (item != nullptr)
{
buffer.release (item);
}
}
}));
}
std::atomic_int count (0);
for (auto i (0); i < 4; ++i)
{
threads.push_back (std::thread ([&buffer, &count]() {
auto done (false);
for (auto i (0); !done && i < 1000; ++i)
{
auto item (buffer.allocate ());
done = item == nullptr;
if (item != nullptr)
{
buffer.enqueue (item);
++count;
if (count > 3000)
{
buffer.stop ();
}
}
}
}));
}
buffer.stop ();
for (auto & i : threads)
{
i.join ();
}
}
TEST (udp_buffer, stats)
{
rai::stat stats;
rai::udp_buffer buffer (stats, 512, 1);
auto buffer1 (buffer.allocate ());
buffer.enqueue (buffer1);
buffer.allocate ();
ASSERT_EQ (1, stats.count (rai::stat::type::udp, rai::stat::detail::overflow));
}

View file

@ -40,11 +40,45 @@ rai::endpoint rai::map_endpoint_to_v6 (rai::endpoint const & endpoint_a)
}
rai::network::network (rai::node & node_a, uint16_t port) :
buffer_container (node_a.stats, rai::network::buffer_size, 4096), // 2Mb receive buffer
socket (node_a.service, rai::endpoint (boost::asio::ip::address_v6::any (), port)),
resolver (node_a.service),
node (node_a),
on (true)
{
for (size_t i = 0; i < node.config.io_threads; ++i)
{
packet_processing_threads.push_back (std::thread ([this]() {
try
{
process_packets ();
}
catch (...)
{
release_assert (false);
}
if (this->node.config.logging.network_packet_logging ())
{
BOOST_LOG (this->node.log) << "Exiting packet processing thread";
}
}));
}
}
rai::network::~network ()
{
for (auto & thread : packet_processing_threads)
{
thread.join ();
}
}
void rai::network::start ()
{
for (size_t i = 0; i < node.config.io_threads; ++i)
{
receive ();
}
}
void rai::network::receive ()
@ -54,16 +88,53 @@ void rai::network::receive ()
BOOST_LOG (node.log) << "Receiving packet";
}
std::unique_lock<std::mutex> lock (socket_mutex);
socket.async_receive_from (boost::asio::buffer (buffer.data (), buffer.size ()), remote, [this](boost::system::error_code const & error, size_t size_a) {
receive_action (error, size_a);
auto data (buffer_container.allocate ());
socket.async_receive_from (boost::asio::buffer (data->buffer, rai::network::buffer_size), data->endpoint, [this, data](boost::system::error_code const & error, size_t size_a) {
if (!error && this->on)
{
data->size = size_a;
this->buffer_container.enqueue (data);
this->receive ();
}
else
{
this->buffer_container.release (data);
if (error)
{
if (this->node.config.logging.network_logging ())
{
BOOST_LOG (this->node.log) << boost::str (boost::format ("UDP Receive error: %1%") % error.message ());
}
}
if (this->on)
{
this->node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() { this->receive (); });
}
}
});
}
void rai::network::process_packets ()
{
while (on)
{
auto data (buffer_container.dequeue ());
if (data == nullptr)
{
break;
}
//std::cerr << data->endpoint.address ().to_string ();
receive_action (data);
buffer_container.release (data);
}
}
void rai::network::stop ()
{
on = false;
socket.close ();
resolver.cancel ();
buffer_container.stop ();
}
void rai::network::send_keepalive (rai::endpoint const & endpoint_a)
@ -486,112 +557,94 @@ public:
};
}
void rai::network::receive_action (boost::system::error_code const & error, size_t size_a)
void rai::network::receive_action (rai::udp_data * data_a)
{
if (!error && on)
if (!rai::reserved_address (data_a->endpoint, false) && data_a->endpoint != endpoint ())
{
if (!rai::reserved_address (remote, false) && remote != endpoint ())
network_message_visitor visitor (node, data_a->endpoint);
rai::message_parser parser (visitor, node.work);
parser.deserialize_buffer (data_a->buffer, data_a->size);
if (parser.status != rai::message_parser::parse_status::success)
{
network_message_visitor visitor (node, remote);
rai::message_parser parser (visitor, node.work);
parser.deserialize_buffer (buffer.data (), size_a);
if (parser.status != rai::message_parser::parse_status::success)
node.stats.inc (rai::stat::type::error);
if (parser.status == rai::message_parser::parse_status::insufficient_work)
{
node.stats.inc (rai::stat::type::error);
if (node.config.logging.insufficient_work_logging ())
{
BOOST_LOG (node.log) << "Insufficient work in message";
}
if (parser.status == rai::message_parser::parse_status::insufficient_work)
// We've already increment error count, update detail only
node.stats.inc_detail_only (rai::stat::type::error, rai::stat::detail::insufficient_work);
}
else if (parser.status == rai::message_parser::parse_status::invalid_message_type)
{
if (node.config.logging.network_logging ())
{
if (node.config.logging.insufficient_work_logging ())
{
BOOST_LOG (node.log) << "Insufficient work in message";
}
// We've already increment error count, update detail only
node.stats.inc_detail_only (rai::stat::type::error, rai::stat::detail::insufficient_work);
BOOST_LOG (node.log) << "Invalid message type in message";
}
else if (parser.status == rai::message_parser::parse_status::invalid_message_type)
}
else if (parser.status == rai::message_parser::parse_status::invalid_header)
{
if (node.config.logging.network_logging ())
{
if (node.config.logging.network_logging ())
{
BOOST_LOG (node.log) << "Invalid message type in message";
}
BOOST_LOG (node.log) << "Invalid header in message";
}
else if (parser.status == rai::message_parser::parse_status::invalid_header)
}
else if (parser.status == rai::message_parser::parse_status::invalid_keepalive_message)
{
if (node.config.logging.network_logging ())
{
if (node.config.logging.network_logging ())
{
BOOST_LOG (node.log) << "Invalid header in message";
}
BOOST_LOG (node.log) << "Invalid keepalive message";
}
else if (parser.status == rai::message_parser::parse_status::invalid_keepalive_message)
}
else if (parser.status == rai::message_parser::parse_status::invalid_publish_message)
{
if (node.config.logging.network_logging ())
{
if (node.config.logging.network_logging ())
{
BOOST_LOG (node.log) << "Invalid keepalive message";
}
BOOST_LOG (node.log) << "Invalid publish message";
}
else if (parser.status == rai::message_parser::parse_status::invalid_publish_message)
}
else if (parser.status == rai::message_parser::parse_status::invalid_confirm_req_message)
{
if (node.config.logging.network_logging ())
{
if (node.config.logging.network_logging ())
{
BOOST_LOG (node.log) << "Invalid publish message";
}
BOOST_LOG (node.log) << "Invalid confirm_req message";
}
else if (parser.status == rai::message_parser::parse_status::invalid_confirm_req_message)
}
else if (parser.status == rai::message_parser::parse_status::invalid_confirm_ack_message)
{
if (node.config.logging.network_logging ())
{
if (node.config.logging.network_logging ())
{
BOOST_LOG (node.log) << "Invalid confirm_req message";
}
BOOST_LOG (node.log) << "Invalid confirm_ack message";
}
else if (parser.status == rai::message_parser::parse_status::invalid_confirm_ack_message)
}
else if (parser.status == rai::message_parser::parse_status::invalid_node_id_handshake_message)
{
if (node.config.logging.network_logging ())
{
if (node.config.logging.network_logging ())
{
BOOST_LOG (node.log) << "Invalid confirm_ack message";
}
}
else if (parser.status == rai::message_parser::parse_status::invalid_node_id_handshake_message)
{
if (node.config.logging.network_logging ())
{
BOOST_LOG (node.log) << "Invalid node_id_handshake message";
}
}
else
{
BOOST_LOG (node.log) << "Could not deserialize buffer";
BOOST_LOG (node.log) << "Invalid node_id_handshake message";
}
}
else
{
node.stats.add (rai::stat::type::traffic, rai::stat::dir::in, size_a);
BOOST_LOG (node.log) << "Could not deserialize buffer";
}
}
else
{
if (node.config.logging.network_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Reserved sender %1%") % remote.address ().to_string ());
}
node.stats.inc_detail_only (rai::stat::type::error, rai::stat::detail::bad_sender);
node.stats.add (rai::stat::type::traffic, rai::stat::dir::in, data_a->size);
}
receive ();
}
else
{
if (error)
if (node.config.logging.network_logging ())
{
if (node.config.logging.network_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("UDP Receive error: %1%") % error.message ());
}
}
if (on)
{
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() { receive (); });
BOOST_LOG (node.log) << boost::str (boost::format ("Reserved sender %1%") % data_a->endpoint.address ().to_string ());
}
node.stats.inc_detail_only (rai::stat::type::error, rai::stat::detail::bad_sender);
}
}
@ -2269,7 +2322,7 @@ bool rai::parse_tcp_endpoint (std::string const & string, rai::tcp_endpoint & en
void rai::node::start ()
{
network.receive ();
network.start ();
ongoing_keepalive ();
ongoing_syn_cookie_cleanup ();
ongoing_bootstrap ();
@ -4299,3 +4352,79 @@ void rai::port_mapping::stop ()
freeUPNPDevlist (devices);
devices = nullptr;
}
rai::udp_buffer::udp_buffer (rai::stat & stats, size_t size, size_t count) :
stats (stats),
free (count),
full (count),
slab (size * count),
entries (count),
stopped (false)
{
assert (count > 0);
assert (size > 0);
auto slab_data (slab.data ());
auto entry_data (entries.data ());
for (auto i (0); i < count; ++i, ++entry_data)
{
*entry_data = { slab_data + i * size, 0, rai::endpoint () };
free.push_back (entry_data);
}
}
rai::udp_data * rai::udp_buffer::allocate ()
{
std::unique_lock<std::mutex> lock (mutex);
while (!stopped && free.empty () && full.empty ())
{
stats.inc (rai::stat::type::udp, rai::stat::detail::blocking, rai::stat::dir::in);
condition.wait (lock);
}
rai::udp_data * result (nullptr);
if (!free.empty ())
{
result = free.front ();
free.pop_front ();
}
if (result == nullptr)
{
result = full.front ();
full.pop_front ();
stats.inc (rai::stat::type::udp, rai::stat::detail::overflow, rai::stat::dir::in);
}
return result;
}
void rai::udp_buffer::enqueue (rai::udp_data * data_a)
{
assert (data_a != nullptr);
std::lock_guard<std::mutex> lock (mutex);
full.push_back (data_a);
condition.notify_one ();
}
rai::udp_data * rai::udp_buffer::dequeue ()
{
std::unique_lock<std::mutex> lock (mutex);
while (!stopped && full.empty ())
{
condition.wait (lock);
}
rai::udp_data * result (nullptr);
if (!full.empty ())
{
result = full.front ();
full.pop_front ();
}
return result;
}
void rai::udp_buffer::release (rai::udp_data * data_a)
{
assert (data_a != nullptr);
std::lock_guard<std::mutex> lock (mutex);
free.push_back (data_a);
condition.notify_one ();
}
void rai::udp_buffer::stop ()
{
std::lock_guard<std::mutex> lock (mutex);
stopped = true;
condition.notify_all ();
}

View file

@ -388,13 +388,63 @@ private:
std::mutex mutex;
rai::node & node;
};
class udp_data
{
public:
uint8_t * buffer;
size_t size;
rai::endpoint endpoint;
};
/**
* A circular buffer for servicing UDP datagrams. This container follows a producer/consumer model where the operating system is producing data in to buffers which are serviced by internal threads.
* If buffers are not serviced fast enough they're internally dropped.
* This container has a maximum space to hold N buffers of M size and will allocate them in round-robin order.
* All public methods are thread-safe
*/
class udp_buffer
{
public:
// Size - Size of each individual buffer
// Count - Number of buffers to allocate
// Stats - Statistics
udp_buffer (rai::stat & stats, size_t, size_t);
// Return a buffer where UDP data can be put
// Method will attempt to return the first free buffer
// If there are no free buffers, an unserviced buffer will be dequeued and returned
// Function will block if there are no free or unserviced buffers
// Return nullptr if the container has stopped
rai::udp_data * allocate ();
// Queue a buffer that has been filled with UDP data and notify servicing threads
void enqueue (rai::udp_data *);
// Return a buffer that has been filled with UDP data
// Function will block until a buffer has been added
// Return nullptr if the container has stopped
rai::udp_data * dequeue ();
// Return a buffer to the freelist after is has been serviced
void release (rai::udp_data *);
// Stop container and notify waiting threads
void stop ();
private:
rai::stat & stats;
std::mutex mutex;
std::condition_variable condition;
boost::circular_buffer<rai::udp_data *> free;
boost::circular_buffer<rai::udp_data *> full;
std::vector<uint8_t> slab;
std::vector<rai::udp_data> entries;
bool stopped;
};
class network
{
public:
network (rai::node &, uint16_t);
~network ();
void receive ();
void process_packets ();
void start ();
void stop ();
void receive_action (boost::system::error_code const &, size_t);
void receive_action (rai::udp_data *);
void rpc_action (boost::system::error_code const &, size_t);
void republish_vote (std::shared_ptr<rai::vote>);
void republish_block (rai::transaction const &, std::shared_ptr<rai::block>, bool = true);
@ -409,14 +459,15 @@ public:
void send_confirm_req (rai::endpoint const &, std::shared_ptr<rai::block>);
void send_buffer (uint8_t const *, size_t, rai::endpoint const &, std::function<void(boost::system::error_code const &, size_t)>);
rai::endpoint endpoint ();
rai::endpoint remote;
std::array<uint8_t, 512> buffer;
rai::udp_buffer buffer_container;
boost::asio::ip::udp::socket socket;
std::mutex socket_mutex;
boost::asio::ip::udp::resolver resolver;
std::vector<std::thread> packet_processing_threads;
rai::node & node;
bool on;
static uint16_t const node_port = rai::rai_network == rai::rai_networks::rai_live_network ? 7075 : 54000;
static size_t const buffer_size = 512;
};
class logging
{

View file

@ -330,6 +330,9 @@ std::string rai::stat::type_to_string (uint32_t key)
case rai::stat::type::ledger:
res = "ledger";
break;
case rai::stat::type::udp:
res = "udp";
break;
case rai::stat::type::peering:
res = "peering";
break;
@ -430,6 +433,12 @@ std::string rai::stat::detail_to_string (uint32_t key)
case rai::stat::detail::vote_invalid:
res = "vote_invalid";
break;
case rai::stat::detail::blocking:
res = "blocking";
break;
case rai::stat::detail::overflow:
res = "overflow";
break;
}
return res;
}

View file

@ -185,6 +185,7 @@ public:
vote,
http_callback,
peering,
udp
};
/** Optional detail type */
@ -227,6 +228,10 @@ public:
vote_replay,
vote_invalid,
// udp
blocking,
overflow,
// peering
handshake,
};