Run packet processing from start

This commit is contained in:
Piotr Wójcik 2024-03-13 19:31:02 +01:00 committed by Piotr Wójcik
commit 09a94096b6
2 changed files with 60 additions and 45 deletions

View file

@ -23,43 +23,14 @@ nano::network::network (nano::node & node_a, uint16_t port_a) :
tcp_channels (node_a, [this] (nano::message const & message, std::shared_ptr<nano::transport::channel> const & channel) {
inbound (message, channel);
}),
port (port_a), disconnect_observer ([] () {})
port (port_a)
{
for (std::size_t i = 0; i < node.config.network_threads && !node.flags.disable_tcp_realtime; ++i)
{
packet_processing_threads.emplace_back (nano::thread_attributes::get_default (), [this, i] () {
nano::thread_role::set (nano::thread_role::name::packet_processing);
try
{
tcp_channels.process_messages ();
}
catch (boost::system::error_code & ec)
{
node.logger.critical (nano::log::type::network, "Error: {}", ec.message ());
release_assert (false);
}
catch (std::error_code & ec)
{
node.logger.critical (nano::log::type::network, "Error: {}", ec.message ());
release_assert (false);
}
catch (std::runtime_error & err)
{
node.logger.critical (nano::log::type::network, "Error: {}", err.what ());
release_assert (false);
}
catch (...)
{
node.logger.critical (nano::log::type::network, "Unknown error");
release_assert (false);
}
});
}
}
nano::network::~network ()
{
stop ();
// All threads must be stopped before this destructor
debug_assert (processing_threads.empty ());
}
void nano::network::start ()
@ -68,26 +39,67 @@ void nano::network::start ()
{
ongoing_cleanup ();
}
ongoing_syn_cookie_cleanup ();
ongoing_keepalive ();
if (!node.flags.disable_tcp_realtime)
{
tcp_channels.start ();
for (std::size_t i = 0; i < node.config.network_threads; ++i)
{
processing_threads.emplace_back (nano::thread_attributes::get_default (), [this] () {
nano::thread_role::set (nano::thread_role::name::packet_processing);
run_processing ();
});
}
}
ongoing_keepalive ();
}
void nano::network::stop ()
{
if (!stopped.exchange (true))
stopped = true;
tcp_channels.stop ();
resolver.cancel ();
tcp_message_manager.stop ();
for (auto & thread : processing_threads)
{
tcp_channels.stop ();
resolver.cancel ();
tcp_message_manager.stop ();
port = 0;
for (auto & thread : packet_processing_threads)
{
thread.join ();
}
thread.join ();
}
processing_threads.clear ();
port = 0;
}
void nano::network::run_processing ()
{
try
{
// TODO: Move responsibility of packet queuing and processing to the message_processor class
tcp_channels.process_messages ();
}
catch (boost::system::error_code & ec)
{
node.logger.critical (nano::log::type::network, "Error: {}", ec.message ());
release_assert (false);
}
catch (std::error_code & ec)
{
node.logger.critical (nano::log::type::network, "Error: {}", ec.message ());
release_assert (false);
}
catch (std::runtime_error & err)
{
node.logger.critical (nano::log::type::network, "Error: {}", err.what ());
release_assert (false);
}
catch (...)
{
node.logger.critical (nano::log::type::network, "Unknown error");
release_assert (false);
}
}

View file

@ -130,6 +130,7 @@ public: // Handshake
nano::node_id_handshake::response_payload prepare_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2) const;
private:
void run_processing ();
void process_message (nano::message const &, std::shared_ptr<nano::transport::channel> const &);
private: // Dependencies
@ -139,18 +140,20 @@ public:
nano::networks const id;
nano::syn_cookies syn_cookies;
boost::asio::ip::udp::resolver resolver;
std::vector<boost::thread> packet_processing_threads;
nano::peer_exclusion excluded_peers;
nano::tcp_message_manager tcp_message_manager;
nano::network_filter publish_filter;
nano::transport::tcp_channels tcp_channels;
std::atomic<uint16_t> port{ 0 };
std::function<void ()> disconnect_observer;
public: // Callbacks
std::function<void ()> disconnect_observer{ [] () {} };
// Called when a new channel is observed
std::function<void (std::shared_ptr<nano::transport::channel>)> channel_observer;
std::function<void (std::shared_ptr<nano::transport::channel>)> channel_observer{ [] (auto) {} };
private:
std::atomic<bool> stopped{ false };
std::vector<boost::thread> processing_threads; // Using boost::thread to enable increased stack size
public:
static unsigned const broadcast_interval_ms = 10;