Dedicated thread for keepalives
This commit is contained in:
parent
fa84f7b381
commit
b67d125896
5 changed files with 38 additions and 15 deletions
|
|
@ -70,6 +70,7 @@ enum class detail : uint8_t
|
|||
ok,
|
||||
loop,
|
||||
loop_cleanup,
|
||||
loop_keepalive,
|
||||
total,
|
||||
process,
|
||||
processed,
|
||||
|
|
|
|||
|
|
@ -112,6 +112,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
|
|||
case nano::thread_role::name::network_cleanup:
|
||||
thread_role_name_string = "Net cleanup";
|
||||
break;
|
||||
case nano::thread_role::name::network_keepalive:
|
||||
thread_role_name_string = "Net keepalive";
|
||||
break;
|
||||
default:
|
||||
debug_assert (false && "nano::thread_role::get_string unhandled thread role");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -46,6 +46,7 @@ enum class name
|
|||
local_block_broadcasting,
|
||||
rep_tiers,
|
||||
network_cleanup,
|
||||
network_keepalive,
|
||||
};
|
||||
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ nano::network::~network ()
|
|||
// All threads must be stopped before this destructor
|
||||
debug_assert (processing_threads.empty ());
|
||||
debug_assert (!cleanup_thread.joinable ());
|
||||
debug_assert (!keepalive_thread.joinable ());
|
||||
}
|
||||
|
||||
void nano::network::start ()
|
||||
|
|
@ -43,7 +44,10 @@ void nano::network::start ()
|
|||
run_cleanup ();
|
||||
});
|
||||
|
||||
ongoing_keepalive ();
|
||||
keepalive_thread = std::thread ([this] () {
|
||||
nano::thread_role::set (nano::thread_role::name::network_keepalive);
|
||||
run_keepalive ();
|
||||
});
|
||||
|
||||
if (!node.flags.disable_tcp_realtime)
|
||||
{
|
||||
|
|
@ -77,6 +81,10 @@ void nano::network::stop ()
|
|||
}
|
||||
processing_threads.clear ();
|
||||
|
||||
if (keepalive_thread.joinable ())
|
||||
{
|
||||
keepalive_thread.join ();
|
||||
}
|
||||
if (cleanup_thread.joinable ())
|
||||
{
|
||||
cleanup_thread.join ();
|
||||
|
|
@ -142,6 +150,28 @@ void nano::network::run_cleanup ()
|
|||
}
|
||||
}
|
||||
|
||||
void nano::network::run_keepalive ()
|
||||
{
|
||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||
while (!stopped)
|
||||
{
|
||||
condition.wait_for (lock, node.network_params.network.keepalive_period);
|
||||
lock.unlock ();
|
||||
|
||||
if (stopped)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_keepalive);
|
||||
|
||||
flood_keepalive (0.75f);
|
||||
flood_keepalive_self (0.25f);
|
||||
|
||||
lock.lock ();
|
||||
}
|
||||
}
|
||||
|
||||
void nano::network::send_keepalive (std::shared_ptr<nano::transport::channel> const & channel_a)
|
||||
{
|
||||
nano::keepalive message{ node.network_params.network };
|
||||
|
|
@ -536,19 +566,6 @@ void nano::network::cleanup (std::chrono::steady_clock::time_point const & cutof
|
|||
}
|
||||
}
|
||||
|
||||
void nano::network::ongoing_keepalive ()
|
||||
{
|
||||
flood_keepalive (0.75f);
|
||||
flood_keepalive_self (0.25f);
|
||||
std::weak_ptr<nano::node> node_w (node.shared ());
|
||||
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.keepalive_period, [node_w] () {
|
||||
if (auto node_l = node_w.lock ())
|
||||
{
|
||||
node_l->network.ongoing_keepalive ();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
std::size_t nano::network::size () const
|
||||
{
|
||||
return tcp_channels.size ();
|
||||
|
|
|
|||
|
|
@ -114,7 +114,6 @@ public:
|
|||
nano::tcp_endpoint bootstrap_peer ();
|
||||
nano::endpoint endpoint () const;
|
||||
void cleanup (std::chrono::steady_clock::time_point const & cutoff);
|
||||
void ongoing_keepalive ();
|
||||
std::size_t size () const;
|
||||
float size_sqrt () const;
|
||||
bool empty () const;
|
||||
|
|
@ -132,6 +131,7 @@ public: // Handshake
|
|||
private:
|
||||
void run_processing ();
|
||||
void run_cleanup ();
|
||||
void run_keepalive ();
|
||||
void process_message (nano::message const &, std::shared_ptr<nano::transport::channel> const &);
|
||||
|
||||
private: // Dependencies
|
||||
|
|
@ -158,6 +158,7 @@ private:
|
|||
nano::condition_variable condition;
|
||||
std::vector<boost::thread> processing_threads; // Using boost::thread to enable increased stack size
|
||||
std::thread cleanup_thread;
|
||||
std::thread keepalive_thread;
|
||||
|
||||
public:
|
||||
static unsigned const broadcast_interval_ms = 10;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue