Move TCP messages processing to network threads (#2613)
* Move TCP realtime messages prosessing to network threads * Add timeout for run_next () * Always log network threads processing exceptions * Remove unnecessary statements (Wesley) * Comment for not removing requests.front ()
This commit is contained in:
parent
a4d4675fa2
commit
a0233d903c
6 changed files with 178 additions and 64 deletions
|
@ -520,22 +520,22 @@ void nano::bootstrap_server::receive_node_id_handshake_action (boost::system::er
|
|||
void nano::bootstrap_server::add_request (std::unique_ptr<nano::message> message_a)
|
||||
{
|
||||
debug_assert (message_a != nullptr);
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
auto start (requests.empty ());
|
||||
requests.push (std::move (message_a));
|
||||
if (start)
|
||||
{
|
||||
run_next ();
|
||||
run_next (lock);
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_server::finish_request ()
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
requests.pop ();
|
||||
if (!requests.empty ())
|
||||
{
|
||||
run_next ();
|
||||
run_next (lock);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -595,35 +595,19 @@ public:
|
|||
}
|
||||
void keepalive (nano::keepalive const & message_a) override
|
||||
{
|
||||
connection->finish_request_async ();
|
||||
auto connection_l (connection->shared_from_this ());
|
||||
connection->node->background ([connection_l, message_a]() {
|
||||
connection_l->node->network.tcp_channels.process_keepalive (message_a, connection_l->remote_endpoint);
|
||||
});
|
||||
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::keepalive> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
|
||||
}
|
||||
void publish (nano::publish const & message_a) override
|
||||
{
|
||||
connection->finish_request_async ();
|
||||
auto connection_l (connection->shared_from_this ());
|
||||
connection->node->background ([connection_l, message_a]() {
|
||||
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
|
||||
});
|
||||
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::publish> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
|
||||
}
|
||||
void confirm_req (nano::confirm_req const & message_a) override
|
||||
{
|
||||
connection->finish_request_async ();
|
||||
auto connection_l (connection->shared_from_this ());
|
||||
connection->node->background ([connection_l, message_a]() {
|
||||
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
|
||||
});
|
||||
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::confirm_req> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
|
||||
}
|
||||
void confirm_ack (nano::confirm_ack const & message_a) override
|
||||
{
|
||||
connection->finish_request_async ();
|
||||
auto connection_l (connection->shared_from_this ());
|
||||
connection->node->background ([connection_l, message_a]() {
|
||||
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
|
||||
});
|
||||
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::confirm_ack> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
|
||||
}
|
||||
void bulk_pull (nano::bulk_pull const &) override
|
||||
{
|
||||
|
@ -647,19 +631,11 @@ public:
|
|||
}
|
||||
void telemetry_req (nano::telemetry_req const & message_a) override
|
||||
{
|
||||
connection->finish_request_async ();
|
||||
auto connection_l (connection->shared_from_this ());
|
||||
connection->node->background ([connection_l, message_a]() {
|
||||
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
|
||||
});
|
||||
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::telemetry_req> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
|
||||
}
|
||||
void telemetry_ack (nano::telemetry_ack const & message_a) override
|
||||
{
|
||||
connection->finish_request_async ();
|
||||
auto connection_l (connection->shared_from_this ());
|
||||
connection->node->background ([connection_l, message_a]() {
|
||||
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
|
||||
});
|
||||
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::telemetry_ack> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
|
||||
}
|
||||
void node_id_handshake (nano::node_id_handshake const & message_a) override
|
||||
{
|
||||
|
@ -717,20 +693,42 @@ public:
|
|||
nano::account node_id (connection->remote_node_id);
|
||||
nano::bootstrap_server_type type (connection->type);
|
||||
debug_assert (node_id.is_zero () || type == nano::bootstrap_server_type::realtime);
|
||||
auto connection_l (connection->shared_from_this ());
|
||||
connection->node->background ([connection_l, message_a, node_id, type]() {
|
||||
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, node_id, connection_l->socket, type);
|
||||
});
|
||||
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::node_id_handshake> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
|
||||
}
|
||||
std::shared_ptr<nano::bootstrap_server> connection;
|
||||
};
|
||||
}
|
||||
|
||||
void nano::bootstrap_server::run_next ()
|
||||
void nano::bootstrap_server::run_next (nano::unique_lock<std::mutex> & lock_a)
|
||||
{
|
||||
debug_assert (!requests.empty ());
|
||||
request_response_visitor visitor (shared_from_this ());
|
||||
requests.front ()->visit (visitor);
|
||||
auto type (requests.front ()->header.type);
|
||||
if (type == nano::message_type::bulk_pull || type == nano::message_type::bulk_pull_account || type == nano::message_type::bulk_push || type == nano::message_type::frontier_req || type == nano::message_type::node_id_handshake)
|
||||
{
|
||||
// Bootstrap & node ID (realtime start)
|
||||
// Request removed from queue in request_response_visitor. For bootstrap with requests.front ().release (), for node ID with finish_request ()
|
||||
requests.front ()->visit (visitor);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Realtime
|
||||
auto request (std::move (requests.front ()));
|
||||
requests.pop ();
|
||||
auto timeout_check (requests.empty ());
|
||||
lock_a.unlock ();
|
||||
request->visit (visitor);
|
||||
if (timeout_check)
|
||||
{
|
||||
std::weak_ptr<nano::bootstrap_server> this_w (shared_from_this ());
|
||||
node->alarm.add (std::chrono::steady_clock::now () + (node->config.tcp_io_timeout * 2) + std::chrono::seconds (1), [this_w]() {
|
||||
if (auto this_l = this_w.lock ())
|
||||
{
|
||||
this_l->timeout ();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool nano::bootstrap_server::is_bootstrap_connection ()
|
||||
|
|
|
@ -62,7 +62,7 @@ public:
|
|||
void finish_request ();
|
||||
void finish_request_async ();
|
||||
void timeout ();
|
||||
void run_next ();
|
||||
void run_next (nano::unique_lock<std::mutex> & lock_a);
|
||||
bool is_bootstrap_connection ();
|
||||
bool is_realtime_connection ();
|
||||
std::shared_ptr<std::vector<uint8_t>> receive_buffer;
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
nano::network::network (nano::node & node_a, uint16_t port_a) :
|
||||
syn_cookies (node_a.network_params.node.max_peers_per_ip),
|
||||
buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer
|
||||
tcp_message_manager (node_a.stats, node_a.config.tcp_incoming_connections_max),
|
||||
resolver (node_a.io_ctx),
|
||||
limiter (node_a.config.bandwidth_limit_burst_ratio, node_a.config.bandwidth_limit),
|
||||
node (node_a),
|
||||
|
@ -24,6 +25,7 @@ disconnect_observer ([]() {})
|
|||
{
|
||||
boost::thread::attributes attrs;
|
||||
nano::thread_attributes::set (attrs);
|
||||
// UDP
|
||||
for (size_t i = 0; i < node.config.network_threads && !node.flags.disable_udp; ++i)
|
||||
{
|
||||
packet_processing_threads.emplace_back (attrs, [this]() {
|
||||
|
@ -34,27 +36,62 @@ disconnect_observer ([]() {})
|
|||
}
|
||||
catch (boost::system::error_code & ec)
|
||||
{
|
||||
this->node.logger.try_log (FATAL_LOG_PREFIX, ec.message ());
|
||||
this->node.logger.always_log (FATAL_LOG_PREFIX, ec.message ());
|
||||
release_assert (false);
|
||||
}
|
||||
catch (std::error_code & ec)
|
||||
{
|
||||
this->node.logger.try_log (FATAL_LOG_PREFIX, ec.message ());
|
||||
this->node.logger.always_log (FATAL_LOG_PREFIX, ec.message ());
|
||||
release_assert (false);
|
||||
}
|
||||
catch (std::runtime_error & err)
|
||||
{
|
||||
this->node.logger.try_log (FATAL_LOG_PREFIX, err.what ());
|
||||
this->node.logger.always_log (FATAL_LOG_PREFIX, err.what ());
|
||||
release_assert (false);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
this->node.logger.try_log (FATAL_LOG_PREFIX, "Unknown exception");
|
||||
this->node.logger.always_log (FATAL_LOG_PREFIX, "Unknown exception");
|
||||
release_assert (false);
|
||||
}
|
||||
if (this->node.config.logging.network_packet_logging ())
|
||||
{
|
||||
this->node.logger.try_log ("Exiting packet processing thread");
|
||||
this->node.logger.try_log ("Exiting UDP packet processing thread");
|
||||
}
|
||||
});
|
||||
}
|
||||
// TCP
|
||||
for (size_t i = 0; i < node.config.network_threads && !node.flags.disable_tcp_realtime; ++i)
|
||||
{
|
||||
packet_processing_threads.emplace_back (attrs, [this]() {
|
||||
nano::thread_role::set (nano::thread_role::name::packet_processing);
|
||||
try
|
||||
{
|
||||
tcp_channels.process_messages ();
|
||||
}
|
||||
catch (boost::system::error_code & ec)
|
||||
{
|
||||
this->node.logger.always_log (FATAL_LOG_PREFIX, ec.message ());
|
||||
release_assert (false);
|
||||
}
|
||||
catch (std::error_code & ec)
|
||||
{
|
||||
this->node.logger.always_log (FATAL_LOG_PREFIX, ec.message ());
|
||||
release_assert (false);
|
||||
}
|
||||
catch (std::runtime_error & err)
|
||||
{
|
||||
this->node.logger.always_log (FATAL_LOG_PREFIX, err.what ());
|
||||
release_assert (false);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
this->node.logger.always_log (FATAL_LOG_PREFIX, "Unknown exception");
|
||||
release_assert (false);
|
||||
}
|
||||
if (this->node.config.logging.network_packet_logging ())
|
||||
{
|
||||
this->node.logger.try_log ("Exiting TCP packet processing thread");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -89,6 +126,7 @@ void nano::network::stop ()
|
|||
tcp_channels.stop ();
|
||||
resolver.cancel ();
|
||||
buffer_container.stop ();
|
||||
tcp_message_manager.stop ();
|
||||
port = 0;
|
||||
for (auto & thread : packet_processing_threads)
|
||||
{
|
||||
|
@ -369,6 +407,13 @@ public:
|
|||
}
|
||||
node.stats.inc (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in);
|
||||
node.network.merge_peers (message_a.peers);
|
||||
// Check for special node port data
|
||||
auto peer0 (message_a.peers[0]);
|
||||
if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0)
|
||||
{
|
||||
nano::endpoint new_endpoint (channel->get_tcp_endpoint ().address (), peer0.port ());
|
||||
node.network.merge_peer (new_endpoint);
|
||||
}
|
||||
}
|
||||
void publish (nano::publish const & message_a) override
|
||||
{
|
||||
|
@ -836,6 +881,54 @@ void nano::message_buffer_manager::stop ()
|
|||
condition.notify_all ();
|
||||
}
|
||||
|
||||
nano::tcp_message_manager::tcp_message_manager (nano::stat & stats_a, unsigned incoming_connections_max_a) :
|
||||
stats (stats_a),
|
||||
max_entries (incoming_connections_max_a * nano::tcp_message_manager::max_entries_per_connection + 1)
|
||||
{
|
||||
debug_assert (max_entries > 0);
|
||||
}
|
||||
|
||||
void nano::tcp_message_manager::put_message (nano::tcp_message_item const & item_a)
|
||||
{
|
||||
{
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
while (entries.size () > max_entries && !stopped)
|
||||
{
|
||||
condition.wait (lock);
|
||||
}
|
||||
entries.push_back (item_a);
|
||||
}
|
||||
condition.notify_all ();
|
||||
}
|
||||
|
||||
nano::tcp_message_item nano::tcp_message_manager::get_message ()
|
||||
{
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
while (entries.empty () && !stopped)
|
||||
{
|
||||
condition.wait (lock);
|
||||
}
|
||||
if (!entries.empty ())
|
||||
{
|
||||
auto result (entries.front ());
|
||||
entries.pop_front ();
|
||||
return result;
|
||||
}
|
||||
else
|
||||
{
|
||||
return nano::tcp_message_item{ std::make_shared<nano::keepalive> (), nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0), 0, nullptr, nano::bootstrap_server_type::undefined };
|
||||
}
|
||||
}
|
||||
|
||||
void nano::tcp_message_manager::stop ()
|
||||
{
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
stopped = true;
|
||||
}
|
||||
condition.notify_all ();
|
||||
}
|
||||
|
||||
nano::syn_cookies::syn_cookies (size_t max_cookies_per_ip_a) :
|
||||
max_cookies_per_ip (max_cookies_per_ip_a)
|
||||
{
|
||||
|
|
|
@ -66,6 +66,24 @@ private:
|
|||
std::vector<nano::message_buffer> entries;
|
||||
bool stopped;
|
||||
};
|
||||
class tcp_message_manager final
|
||||
{
|
||||
public:
|
||||
tcp_message_manager (nano::stat & stats_a, unsigned incoming_connections_max_a);
|
||||
void put_message (nano::tcp_message_item const & item_a);
|
||||
nano::tcp_message_item get_message ();
|
||||
// Stop container and notify waiting threads
|
||||
void stop ();
|
||||
|
||||
private:
|
||||
nano::stat & stats;
|
||||
std::mutex mutex;
|
||||
nano::condition_variable condition;
|
||||
std::deque<nano::tcp_message_item> entries;
|
||||
unsigned max_entries;
|
||||
static unsigned const max_entries_per_connection = 16;
|
||||
bool stopped{ false };
|
||||
};
|
||||
/**
|
||||
* Node ID cookies for node ID handshakes
|
||||
*/
|
||||
|
@ -157,6 +175,7 @@ public:
|
|||
std::vector<boost::thread> packet_processing_threads;
|
||||
nano::bandwidth_limiter limiter;
|
||||
nano::peer_exclusion excluded_peers;
|
||||
nano::tcp_message_manager tcp_message_manager;
|
||||
nano::node & node;
|
||||
nano::network_filter publish_filter;
|
||||
nano::transport::udp_channels udp_channels;
|
||||
|
|
|
@ -259,6 +259,18 @@ nano::tcp_endpoint nano::transport::tcp_channels::bootstrap_peer (uint8_t connec
|
|||
return result;
|
||||
}
|
||||
|
||||
void nano::transport::tcp_channels::process_messages ()
|
||||
{
|
||||
while (!stopped)
|
||||
{
|
||||
auto item (node.network.tcp_message_manager.get_message ());
|
||||
if (item.message != nullptr)
|
||||
{
|
||||
process_message (*item.message, item.endpoint, item.node_id, item.socket, item.type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void nano::transport::tcp_channels::process_message (nano::message const & message_a, nano::tcp_endpoint const & endpoint_a, nano::account const & node_id_a, std::shared_ptr<nano::socket> socket_a, nano::bootstrap_server_type type_a)
|
||||
{
|
||||
if (!stopped && message_a.header.version_using >= protocol_constants ().protocol_version_min (node.ledger.cache.epoch_2_started))
|
||||
|
@ -308,23 +320,6 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa
|
|||
}
|
||||
}
|
||||
|
||||
void nano::transport::tcp_channels::process_keepalive (nano::keepalive const & message_a, nano::tcp_endpoint const & endpoint_a)
|
||||
{
|
||||
if (!max_ip_connections (endpoint_a))
|
||||
{
|
||||
// Check for special node port data
|
||||
auto peer0 (message_a.peers[0]);
|
||||
if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0)
|
||||
{
|
||||
nano::endpoint new_endpoint (endpoint_a.address (), peer0.port ());
|
||||
node.network.merge_peer (new_endpoint);
|
||||
}
|
||||
// Used to store sender endpoint information only
|
||||
auto udp_channel (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, nano::transport::map_tcp_to_endpoint (endpoint_a), node.network_params.protocol.protocol_version));
|
||||
node.network.process_message (message_a, udp_channel);
|
||||
}
|
||||
}
|
||||
|
||||
void nano::transport::tcp_channels::start ()
|
||||
{
|
||||
ongoing_keepalive ();
|
||||
|
|
|
@ -18,6 +18,15 @@ namespace nano
|
|||
{
|
||||
class bootstrap_server;
|
||||
enum class bootstrap_server_type;
|
||||
class tcp_message_item final
|
||||
{
|
||||
public:
|
||||
std::shared_ptr<nano::message> message;
|
||||
nano::tcp_endpoint endpoint;
|
||||
nano::account node_id;
|
||||
std::shared_ptr<nano::socket> socket;
|
||||
nano::bootstrap_server_type type;
|
||||
};
|
||||
namespace transport
|
||||
{
|
||||
class tcp_channels;
|
||||
|
@ -95,8 +104,8 @@ namespace transport
|
|||
void receive ();
|
||||
void start ();
|
||||
void stop ();
|
||||
void process_messages ();
|
||||
void process_message (nano::message const &, nano::tcp_endpoint const &, nano::account const &, std::shared_ptr<nano::socket>, nano::bootstrap_server_type);
|
||||
void process_keepalive (nano::keepalive const &, nano::tcp_endpoint const &);
|
||||
bool max_ip_connections (nano::tcp_endpoint const &);
|
||||
// Should we reach out to this endpoint with a keepalive message
|
||||
bool reachout (nano::endpoint const &);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue