From a8aaa351e4bd9a645df23d88abd2db1d9c7461e9 Mon Sep 17 00:00:00 2001 From: clemahieu Date: Sat, 1 Jul 2017 03:59:35 -0500 Subject: [PATCH] Adding timeouts to bootstrap_client. --- rai/node/bootstrap.cpp | 63 +++++++++++++++++++++++++++++++++--------- rai/node/bootstrap.hpp | 3 ++ 2 files changed, 53 insertions(+), 13 deletions(-) diff --git a/rai/node/bootstrap.cpp b/rai/node/bootstrap.cpp index b154a005..dacf6a71 100755 --- a/rai/node/bootstrap.cpp +++ b/rai/node/bootstrap.cpp @@ -153,7 +153,8 @@ attempt (attempt_a), socket (node_a->network.service), connected (false), pull_client (*this), -endpoint (endpoint_a) +endpoint (endpoint_a), +timeout (node_a->network.service) { } @@ -162,11 +163,39 @@ rai::bootstrap_client::~bootstrap_client () attempt->connection_ending (this); } +void rai::bootstrap_client::start_timeout () +{ + timeout.expires_from_now (boost::posix_time::seconds (15)); + std::weak_ptr this_w (shared ()); + timeout.async_wait ([this_w] (boost::system::error_code const & ec) + { + if (ec != boost::asio::error::operation_aborted) + { + auto this_l (this_w.lock ()); + if (this_l != nullptr) + { + if (!this_l->connected) + { + this_l->socket.close (); + } + } + } + }); +} + +void rai::bootstrap_client::stop_timeout () +{ + auto killed (timeout.expires_from_now ()); + (void) killed; +} + void rai::bootstrap_client::run () { auto this_l (shared_from_this ()); + start_timeout (); socket.async_connect (endpoint, [this_l] (boost::system::error_code const & ec) { + this_l->stop_timeout (); if (!ec) { BOOST_LOG (this_l->node->log) << boost::str (boost::format ("Connection established to %1%") % this_l->endpoint); @@ -190,18 +219,6 @@ void rai::bootstrap_client::run () } } }); - std::weak_ptr this_w (this_l); - node->alarm.add (std::chrono::system_clock::now () + std::chrono::seconds(10), [this_w] () - { - auto this_l (this_w.lock ()); - if (this_l != nullptr) - { - if (!this_l->connected) - { - this_l->socket.close (); - } - } - }); } void rai::bootstrap_client::frontier_request () @@ -216,8 +233,10 @@ void rai::bootstrap_client::frontier_request () request->serialize (stream); } auto this_l (shared_from_this ()); + start_timeout (); boost::asio::async_write (socket, boost::asio::buffer (send_buffer->data (), send_buffer->size ()), [this_l, send_buffer] (boost::system::error_code const & ec, size_t size_a) { + this_l->stop_timeout (); this_l->sent_request (ec, size_a); }); } @@ -262,8 +281,10 @@ rai::frontier_req_client::~frontier_req_client () void rai::frontier_req_client::receive_frontier () { auto this_l (shared_from_this ()); + connection->start_timeout (); boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data (), sizeof (rai::uint256_union) + sizeof (rai::uint256_union)), [this_l] (boost::system::error_code const & ec, size_t size_a) { + this_l->connection->stop_timeout (); this_l->received_frontier (ec, size_a); }); } @@ -412,8 +433,10 @@ void rai::bulk_pull_client::request (rai::pull_info const & pull_a) } ++account_count; auto connection_l (connection.shared ()); + connection.start_timeout (); boost::asio::async_write (connection.socket, boost::asio::buffer (buffer->data (), buffer->size ()), [connection_l, buffer] (boost::system::error_code const & ec, size_t size_a) { + connection_l->stop_timeout (); if (!ec) { connection_l->pull_client.receive_block (); @@ -428,8 +451,10 @@ void rai::bulk_pull_client::request (rai::pull_info const & pull_a) void rai::bulk_pull_client::receive_block () { auto connection_l (connection.shared ()); + connection.start_timeout (); boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data (), 1), [connection_l] (boost::system::error_code const & ec, size_t size_a) { + connection_l->stop_timeout (); if (!ec) { connection_l->pull_client.received_type (); @@ -449,32 +474,40 @@ void rai::bulk_pull_client::received_type () { case rai::block_type::send: { + connection.start_timeout (); boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::send_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a) { + connection_l->stop_timeout (); connection_l->pull_client.received_block (ec, size_a); }); break; } case rai::block_type::receive: { + connection.start_timeout (); boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::receive_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a) { + connection_l->stop_timeout (); connection_l->pull_client.received_block (ec, size_a); }); break; } case rai::block_type::open: { + connection.start_timeout (); boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::open_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a) { + connection_l->stop_timeout (); connection_l->pull_client.received_block (ec, size_a); }); break; } case rai::block_type::change: { + connection.start_timeout (); boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::change_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a) { + connection_l->stop_timeout (); connection_l->pull_client.received_block (ec, size_a); }); break; @@ -585,8 +618,10 @@ void rai::bulk_push_client::start () message.serialize (stream); } auto this_l (shared_from_this ()); + connection->start_timeout (); boost::asio::async_write (connection->socket, boost::asio::buffer (buffer->data (), buffer->size ()), [this_l, buffer] (boost::system::error_code const & ec, size_t size_a) { + this_l->connection->stop_timeout (); rai::transaction transaction (this_l->connection->node->store.environment, nullptr, true); if (!ec) { @@ -652,8 +687,10 @@ void rai::bulk_push_client::push_block (rai::block const & block_a) rai::serialize_block (stream, block_a); } auto this_l (shared_from_this ()); + connection->start_timeout (); boost::asio::async_write (connection->socket, boost::asio::buffer (buffer->data (), buffer->size ()), [this_l, buffer] (boost::system::error_code const & ec, size_t size_a) { + this_l->connection->stop_timeout (); if (!ec) { rai::transaction transaction (this_l->connection->node->store.environment, nullptr, true); diff --git a/rai/node/bootstrap.hpp b/rai/node/bootstrap.hpp index a14bc19e..20dbfcc4 100644 --- a/rai/node/bootstrap.hpp +++ b/rai/node/bootstrap.hpp @@ -131,6 +131,8 @@ public: void frontier_request (); void sent_request (boost::system::error_code const &, size_t); std::shared_ptr shared (); + void start_timeout (); + void stop_timeout (); std::shared_ptr node; std::shared_ptr attempt; boost::asio::ip::tcp::socket socket; @@ -138,6 +140,7 @@ public: bool connected; rai::bulk_pull_client pull_client; rai::tcp_endpoint endpoint; + boost::asio::deadline_timer timeout; }; class bulk_push_client : public std::enable_shared_from_this {