Dedicated thread for periodic cleanup
This commit is contained in:
		
					parent
					
						
							
								09a94096b6
							
						
					
				
			
			
				commit
				
					
						98b3de8142
					
				
			
		
					 5 changed files with 55 additions and 18 deletions
				
			
		| 
						 | 
				
			
			@ -17,6 +17,7 @@ enum class type : uint8_t
 | 
			
		|||
	ledger,
 | 
			
		||||
	rollback,
 | 
			
		||||
	bootstrap,
 | 
			
		||||
	network,
 | 
			
		||||
	tcp_server,
 | 
			
		||||
	vote,
 | 
			
		||||
	election,
 | 
			
		||||
| 
						 | 
				
			
			@ -67,6 +68,7 @@ enum class detail : uint8_t
 | 
			
		|||
	// common
 | 
			
		||||
	ok,
 | 
			
		||||
	loop,
 | 
			
		||||
	loop_cleanup,
 | 
			
		||||
	total,
 | 
			
		||||
	process,
 | 
			
		||||
	processed,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -109,6 +109,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
 | 
			
		|||
		case nano::thread_role::name::rep_tiers:
 | 
			
		||||
			thread_role_name_string = "Rep tiers";
 | 
			
		||||
			break;
 | 
			
		||||
		case nano::thread_role::name::network_cleanup:
 | 
			
		||||
			thread_role_name_string = "Net cleanup";
 | 
			
		||||
			break;
 | 
			
		||||
		default:
 | 
			
		||||
			debug_assert (false && "nano::thread_role::get_string unhandled thread role");
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -45,6 +45,7 @@ enum class name
 | 
			
		|||
	rep_crawler,
 | 
			
		||||
	local_block_broadcasting,
 | 
			
		||||
	rep_tiers,
 | 
			
		||||
	network_cleanup,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -9,6 +9,8 @@
 | 
			
		|||
 | 
			
		||||
#include <boost/format.hpp>
 | 
			
		||||
 | 
			
		||||
using namespace std::chrono_literals;
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 * network
 | 
			
		||||
 */
 | 
			
		||||
| 
						 | 
				
			
			@ -31,13 +33,17 @@ nano::network::~network ()
 | 
			
		|||
{
 | 
			
		||||
	// All threads must be stopped before this destructor
 | 
			
		||||
	debug_assert (processing_threads.empty ());
 | 
			
		||||
	debug_assert (!cleanup_thread.joinable ());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void nano::network::start ()
 | 
			
		||||
{
 | 
			
		||||
	if (!node.flags.disable_connection_cleanup)
 | 
			
		||||
	{
 | 
			
		||||
		ongoing_cleanup ();
 | 
			
		||||
		cleanup_thread = std::thread ([this] () {
 | 
			
		||||
			nano::thread_role::set (nano::thread_role::name::network_cleanup);
 | 
			
		||||
			run_cleanup ();
 | 
			
		||||
		});
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ongoing_syn_cookie_cleanup ();
 | 
			
		||||
| 
						 | 
				
			
			@ -59,7 +65,11 @@ void nano::network::start ()
 | 
			
		|||
 | 
			
		||||
void nano::network::stop ()
 | 
			
		||||
{
 | 
			
		||||
	stopped = true;
 | 
			
		||||
	{
 | 
			
		||||
		nano::lock_guard<nano::mutex> lock{ mutex };
 | 
			
		||||
		stopped = true;
 | 
			
		||||
	}
 | 
			
		||||
	condition.notify_all ();
 | 
			
		||||
 | 
			
		||||
	tcp_channels.stop ();
 | 
			
		||||
	resolver.cancel ();
 | 
			
		||||
| 
						 | 
				
			
			@ -71,6 +81,11 @@ void nano::network::stop ()
 | 
			
		|||
	}
 | 
			
		||||
	processing_threads.clear ();
 | 
			
		||||
 | 
			
		||||
	if (cleanup_thread.joinable ())
 | 
			
		||||
	{
 | 
			
		||||
		cleanup_thread.join ();
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	port = 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -103,6 +118,28 @@ void nano::network::run_processing ()
 | 
			
		|||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void nano::network::run_cleanup ()
 | 
			
		||||
{
 | 
			
		||||
	nano::unique_lock<nano::mutex> lock{ mutex };
 | 
			
		||||
	while (!stopped)
 | 
			
		||||
	{
 | 
			
		||||
		condition.wait_for (lock, node.network_params.network.is_dev_network () ? 1s : 5s);
 | 
			
		||||
		lock.unlock ();
 | 
			
		||||
 | 
			
		||||
		if (stopped)
 | 
			
		||||
		{
 | 
			
		||||
			return;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_cleanup);
 | 
			
		||||
 | 
			
		||||
		auto const cutoff = std::chrono::steady_clock::now () - node.network_params.network.cleanup_cutoff ();
 | 
			
		||||
		cleanup (cutoff);
 | 
			
		||||
 | 
			
		||||
		lock.lock ();
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void nano::network::send_keepalive (std::shared_ptr<nano::transport::channel> const & channel_a)
 | 
			
		||||
{
 | 
			
		||||
	nano::keepalive message{ node.network_params.network };
 | 
			
		||||
| 
						 | 
				
			
			@ -493,27 +530,18 @@ nano::endpoint nano::network::endpoint () const
 | 
			
		|||
	return nano::endpoint (boost::asio::ip::address_v6::loopback (), port);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void nano::network::cleanup (std::chrono::steady_clock::time_point const & cutoff_a)
 | 
			
		||||
void nano::network::cleanup (std::chrono::steady_clock::time_point const & cutoff)
 | 
			
		||||
{
 | 
			
		||||
	tcp_channels.purge (cutoff_a);
 | 
			
		||||
	node.logger.debug (nano::log::type::network, "Performing cleanup, cutoff: {}s", nano::log::seconds_delta (cutoff));
 | 
			
		||||
 | 
			
		||||
	tcp_channels.purge (cutoff);
 | 
			
		||||
 | 
			
		||||
	if (node.network.empty ())
 | 
			
		||||
	{
 | 
			
		||||
		disconnect_observer ();
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void nano::network::ongoing_cleanup ()
 | 
			
		||||
{
 | 
			
		||||
	cleanup (std::chrono::steady_clock::now () - node.network_params.network.cleanup_cutoff ());
 | 
			
		||||
	std::weak_ptr<nano::node> node_w (node.shared ());
 | 
			
		||||
	node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (node.network_params.network.is_dev_network () ? 1 : 5), [node_w] () {
 | 
			
		||||
		if (auto node_l = node_w.lock ())
 | 
			
		||||
		{
 | 
			
		||||
			node_l->network.ongoing_cleanup ();
 | 
			
		||||
		}
 | 
			
		||||
	});
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void nano::network::ongoing_syn_cookie_cleanup ()
 | 
			
		||||
{
 | 
			
		||||
	syn_cookies.purge (std::chrono::steady_clock::now () - nano::transport::syn_cookie_cutoff);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -111,8 +111,7 @@ public:
 | 
			
		|||
	// Get the next peer for attempting a tcp bootstrap connection
 | 
			
		||||
	nano::tcp_endpoint bootstrap_peer ();
 | 
			
		||||
	nano::endpoint endpoint () const;
 | 
			
		||||
	void cleanup (std::chrono::steady_clock::time_point const &);
 | 
			
		||||
	void ongoing_cleanup ();
 | 
			
		||||
	void cleanup (std::chrono::steady_clock::time_point const & cutoff);
 | 
			
		||||
	void ongoing_syn_cookie_cleanup ();
 | 
			
		||||
	void ongoing_keepalive ();
 | 
			
		||||
	std::size_t size () const;
 | 
			
		||||
| 
						 | 
				
			
			@ -131,6 +130,7 @@ public: // Handshake
 | 
			
		|||
 | 
			
		||||
private:
 | 
			
		||||
	void run_processing ();
 | 
			
		||||
	void run_cleanup ();
 | 
			
		||||
	void process_message (nano::message const &, std::shared_ptr<nano::transport::channel> const &);
 | 
			
		||||
 | 
			
		||||
private: // Dependencies
 | 
			
		||||
| 
						 | 
				
			
			@ -153,7 +153,10 @@ public: // Callbacks
 | 
			
		|||
 | 
			
		||||
private:
 | 
			
		||||
	std::atomic<bool> stopped{ false };
 | 
			
		||||
	mutable nano::mutex mutex;
 | 
			
		||||
	nano::condition_variable condition;
 | 
			
		||||
	std::vector<boost::thread> processing_threads; // Using boost::thread to enable increased stack size
 | 
			
		||||
	std::thread cleanup_thread;
 | 
			
		||||
 | 
			
		||||
public:
 | 
			
		||||
	static unsigned const broadcast_interval_ms = 10;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue