Adding timeouts to bootstrap_client.

This commit is contained in:
clemahieu 2017-07-01 03:59:35 -05:00
commit a8aaa351e4
2 changed files with 53 additions and 13 deletions

View file

@ -153,7 +153,8 @@ attempt (attempt_a),
socket (node_a->network.service),
connected (false),
pull_client (*this),
endpoint (endpoint_a)
endpoint (endpoint_a),
timeout (node_a->network.service)
{
}
@ -162,11 +163,39 @@ rai::bootstrap_client::~bootstrap_client ()
attempt->connection_ending (this);
}
void rai::bootstrap_client::start_timeout ()
{
timeout.expires_from_now (boost::posix_time::seconds (15));
std::weak_ptr <rai::bootstrap_client> this_w (shared ());
timeout.async_wait ([this_w] (boost::system::error_code const & ec)
{
if (ec != boost::asio::error::operation_aborted)
{
auto this_l (this_w.lock ());
if (this_l != nullptr)
{
if (!this_l->connected)
{
this_l->socket.close ();
}
}
}
});
}
void rai::bootstrap_client::stop_timeout ()
{
auto killed (timeout.expires_from_now ());
(void) killed;
}
void rai::bootstrap_client::run ()
{
auto this_l (shared_from_this ());
start_timeout ();
socket.async_connect (endpoint, [this_l] (boost::system::error_code const & ec)
{
this_l->stop_timeout ();
if (!ec)
{
BOOST_LOG (this_l->node->log) << boost::str (boost::format ("Connection established to %1%") % this_l->endpoint);
@ -190,18 +219,6 @@ void rai::bootstrap_client::run ()
}
}
});
std::weak_ptr <rai::bootstrap_client> this_w (this_l);
node->alarm.add (std::chrono::system_clock::now () + std::chrono::seconds(10), [this_w] ()
{
auto this_l (this_w.lock ());
if (this_l != nullptr)
{
if (!this_l->connected)
{
this_l->socket.close ();
}
}
});
}
void rai::bootstrap_client::frontier_request ()
@ -216,8 +233,10 @@ void rai::bootstrap_client::frontier_request ()
request->serialize (stream);
}
auto this_l (shared_from_this ());
start_timeout ();
boost::asio::async_write (socket, boost::asio::buffer (send_buffer->data (), send_buffer->size ()), [this_l, send_buffer] (boost::system::error_code const & ec, size_t size_a)
{
this_l->stop_timeout ();
this_l->sent_request (ec, size_a);
});
}
@ -262,8 +281,10 @@ rai::frontier_req_client::~frontier_req_client ()
void rai::frontier_req_client::receive_frontier ()
{
auto this_l (shared_from_this ());
connection->start_timeout ();
boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data (), sizeof (rai::uint256_union) + sizeof (rai::uint256_union)), [this_l] (boost::system::error_code const & ec, size_t size_a)
{
this_l->connection->stop_timeout ();
this_l->received_frontier (ec, size_a);
});
}
@ -412,8 +433,10 @@ void rai::bulk_pull_client::request (rai::pull_info const & pull_a)
}
++account_count;
auto connection_l (connection.shared ());
connection.start_timeout ();
boost::asio::async_write (connection.socket, boost::asio::buffer (buffer->data (), buffer->size ()), [connection_l, buffer] (boost::system::error_code const & ec, size_t size_a)
{
connection_l->stop_timeout ();
if (!ec)
{
connection_l->pull_client.receive_block ();
@ -428,8 +451,10 @@ void rai::bulk_pull_client::request (rai::pull_info const & pull_a)
void rai::bulk_pull_client::receive_block ()
{
auto connection_l (connection.shared ());
connection.start_timeout ();
boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data (), 1), [connection_l] (boost::system::error_code const & ec, size_t size_a)
{
connection_l->stop_timeout ();
if (!ec)
{
connection_l->pull_client.received_type ();
@ -449,32 +474,40 @@ void rai::bulk_pull_client::received_type ()
{
case rai::block_type::send:
{
connection.start_timeout ();
boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::send_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a)
{
connection_l->stop_timeout ();
connection_l->pull_client.received_block (ec, size_a);
});
break;
}
case rai::block_type::receive:
{
connection.start_timeout ();
boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::receive_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a)
{
connection_l->stop_timeout ();
connection_l->pull_client.received_block (ec, size_a);
});
break;
}
case rai::block_type::open:
{
connection.start_timeout ();
boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::open_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a)
{
connection_l->stop_timeout ();
connection_l->pull_client.received_block (ec, size_a);
});
break;
}
case rai::block_type::change:
{
connection.start_timeout ();
boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::change_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a)
{
connection_l->stop_timeout ();
connection_l->pull_client.received_block (ec, size_a);
});
break;
@ -585,8 +618,10 @@ void rai::bulk_push_client::start ()
message.serialize (stream);
}
auto this_l (shared_from_this ());
connection->start_timeout ();
boost::asio::async_write (connection->socket, boost::asio::buffer (buffer->data (), buffer->size ()), [this_l, buffer] (boost::system::error_code const & ec, size_t size_a)
{
this_l->connection->stop_timeout ();
rai::transaction transaction (this_l->connection->node->store.environment, nullptr, true);
if (!ec)
{
@ -652,8 +687,10 @@ void rai::bulk_push_client::push_block (rai::block const & block_a)
rai::serialize_block (stream, block_a);
}
auto this_l (shared_from_this ());
connection->start_timeout ();
boost::asio::async_write (connection->socket, boost::asio::buffer (buffer->data (), buffer->size ()), [this_l, buffer] (boost::system::error_code const & ec, size_t size_a)
{
this_l->connection->stop_timeout ();
if (!ec)
{
rai::transaction transaction (this_l->connection->node->store.environment, nullptr, true);

View file

@ -131,6 +131,8 @@ public:
void frontier_request ();
void sent_request (boost::system::error_code const &, size_t);
std::shared_ptr <rai::bootstrap_client> shared ();
void start_timeout ();
void stop_timeout ();
std::shared_ptr <rai::node> node;
std::shared_ptr <rai::bootstrap_attempt> attempt;
boost::asio::ip::tcp::socket socket;
@ -138,6 +140,7 @@ public:
bool connected;
rai::bulk_pull_client pull_client;
rai::tcp_endpoint endpoint;
boost::asio::deadline_timer timeout;
};
class bulk_push_client : public std::enable_shared_from_this <rai::bulk_push_client>
{