Moving bootstrap_pull_client back in to a separate object.

This commit is contained in:
clemahieu 2017-08-08 20:11:14 -05:00
commit a9c83fef63
2 changed files with 66 additions and 87 deletions

View file

@ -151,20 +151,14 @@ rai::bootstrap_client::bootstrap_client (std::shared_ptr <rai::node> node_a, std
node (node_a),
attempt (attempt_a),
socket (node_a->network.service),
pull_client (*this),
endpoint (endpoint_a),
timeout (node_a->network.service)
{
++attempt->connections;
}
rai::bootstrap_client::~bootstrap_client ()
{
if (!pull_client.pull.account.is_zero ())
{
// If this connection is ending and request_account hasn't been cleared it didn't finish, requeue
attempt->requeue_pull (pull_client.pull);
--attempt->pulling;
}
--attempt->connections;
}
@ -264,7 +258,7 @@ std::shared_ptr <rai::bootstrap_client> rai::bootstrap_client::shared ()
return shared_from_this ();
}
rai::frontier_req_client::frontier_req_client (std::shared_ptr <rai::bootstrap_client> const & connection_a) :
rai::frontier_req_client::frontier_req_client (std::shared_ptr <rai::bootstrap_client> connection_a) :
connection (connection_a),
current (0),
count (0),
@ -443,9 +437,8 @@ void rai::frontier_req_client::next (MDB_txn * transaction_a)
}
}
rai::bulk_pull_client::bulk_pull_client (rai::bootstrap_client & connection_a) :
connection (connection_a),
account_count (0)
rai::bulk_pull_client::bulk_pull_client (std::shared_ptr <rai::bootstrap_client> connection_a) :
connection (connection_a)
{
}
@ -465,103 +458,112 @@ void rai::bulk_pull_client::request (rai::pull_info const & pull_a)
rai::vectorstream stream (*buffer);
req.serialize (stream);
}
if (connection.node->config.logging.bulk_pull_logging ())
if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection.node->log) << boost::str (boost::format ("Requesting account %1% down to %2% from %3%") % req.start.to_account () % req.end.to_string () % connection.endpoint);
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Requesting account %1% down to %2% from %3%") % req.start.to_account () % req.end.to_string () % connection->endpoint);
}
else if (connection.node->config.logging.network_logging () && account_count % 256 == 0)
else if (connection->node->config.logging.network_logging () && connection->attempt->account_count++ % 256 == 0)
{
BOOST_LOG (connection.node->log) << boost::str (boost::format ("Requesting account %1% down to %2% from %3%") % req.start.to_account () % req.end.to_string () % connection.endpoint);
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Requesting account %1% down to %2% from %3%") % req.start.to_account () % req.end.to_string () % connection->endpoint);
}
++account_count;
auto connection_l (connection.shared ());
connection.start_timeout ();
boost::asio::async_write (connection.socket, boost::asio::buffer (buffer->data (), buffer->size ()), [connection_l, buffer] (boost::system::error_code const & ec, size_t size_a)
auto this_l (shared_from_this ());
connection->start_timeout ();
boost::asio::async_write (connection->socket, boost::asio::buffer (buffer->data (), buffer->size ()), [this_l, buffer] (boost::system::error_code const & ec, size_t size_a)
{
connection_l->stop_timeout ();
this_l->connection->stop_timeout ();
if (!ec)
{
connection_l->pull_client.receive_block ();
this_l->receive_block ();
}
else
{
BOOST_LOG (connection_l->node->log) << boost::str (boost::format ("Error sending bulk pull request %1% to %2%") % ec.message () % connection_l->endpoint);
BOOST_LOG (this_l->connection->node->log) << boost::str (boost::format ("Error sending bulk pull request %1% to %2%") % ec.message () % this_l->connection->endpoint);
}
});
}
void rai::bulk_pull_client::receive_block ()
{
auto connection_l (connection.shared ());
connection.start_timeout ();
boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data (), 1), [connection_l] (boost::system::error_code const & ec, size_t size_a)
auto this_l (shared_from_this ());
connection->start_timeout ();
boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data (), 1), [this_l] (boost::system::error_code const & ec, size_t size_a)
{
connection_l->stop_timeout ();
this_l->connection->stop_timeout ();
if (!ec)
{
connection_l->pull_client.received_type ();
this_l->received_type ();
}
else
{
BOOST_LOG (connection_l->node->log) << boost::str (boost::format ("Error receiving block type %1%") % ec.message ());
BOOST_LOG (this_l->connection->node->log) << boost::str (boost::format ("Error receiving block type %1%") % ec.message ());
}
});
}
void rai::bulk_pull_client::received_type ()
{
auto connection_l (connection.shared ());
rai::block_type type (static_cast <rai::block_type> (connection.receive_buffer [0]));
auto this_l (shared_from_this ());
rai::block_type type (static_cast <rai::block_type> (connection->receive_buffer [0]));
switch (type)
{
case rai::block_type::send:
{
connection.start_timeout ();
boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::send_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a)
connection->start_timeout ();
boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data () + 1, rai::send_block::size), [this_l] (boost::system::error_code const & ec, size_t size_a)
{
connection_l->stop_timeout ();
connection_l->pull_client.received_block (ec, size_a);
this_l->connection->stop_timeout ();
this_l->received_block (ec, size_a);
});
break;
}
case rai::block_type::receive:
{
connection.start_timeout ();
boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::receive_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a)
connection->start_timeout ();
boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data () + 1, rai::receive_block::size), [this_l] (boost::system::error_code const & ec, size_t size_a)
{
connection_l->stop_timeout ();
connection_l->pull_client.received_block (ec, size_a);
this_l->connection->stop_timeout ();
this_l->received_block (ec, size_a);
});
break;
}
case rai::block_type::open:
{
connection.start_timeout ();
boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::open_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a)
connection->start_timeout ();
boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data () + 1, rai::open_block::size), [this_l] (boost::system::error_code const & ec, size_t size_a)
{
connection_l->stop_timeout ();
connection_l->pull_client.received_block (ec, size_a);
this_l->connection->stop_timeout ();
this_l->received_block (ec, size_a);
});
break;
}
case rai::block_type::change:
{
connection.start_timeout ();
boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::change_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a)
connection->start_timeout ();
boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data () + 1, rai::change_block::size), [this_l] (boost::system::error_code const & ec, size_t size_a)
{
connection_l->stop_timeout ();
connection_l->pull_client.received_block (ec, size_a);
this_l->connection->stop_timeout ();
this_l->received_block (ec, size_a);
});
break;
}
case rai::block_type::not_a_block:
{
connection.completed_pull ();
if (expected == pull.end)
{
pull = rai::pull_info ();
--connection->attempt->pulling;
connection->work ();
}
else
{
connection->attempt->requeue_pull (pull);
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Disconnecting from %1% because it didn't give us what we requested") % connection->endpoint);
}
break;
}
default:
{
BOOST_LOG (connection.node->log) << boost::str (boost::format ("Unknown type received as block type: %1%") % static_cast <int> (type));
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Unknown type received as block type: %1%") % static_cast <int> (type));
break;
}
}
@ -571,22 +573,22 @@ void rai::bulk_pull_client::received_block (boost::system::error_code const & ec
{
if (!ec)
{
rai::bufferstream stream (connection.receive_buffer.data (), 1 + size_a);
rai::bufferstream stream (connection->receive_buffer.data (), 1 + size_a);
std::shared_ptr <rai::block> block (rai::deserialize_block (stream));
if (block != nullptr)
{
auto hash (block->hash ());
if (connection.node->config.logging.bulk_pull_logging ())
if (connection->node->config.logging.bulk_pull_logging ())
{
std::string block_l;
block->serialize_json (block_l);
BOOST_LOG (connection.node->log) << boost::str (boost::format ("Pulled block %1% %2%") % hash.to_string () % block_l);
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Pulled block %1% %2%") % hash.to_string () % block_l);
}
if (hash == expected)
{
expected = block->previous ();
}
auto attempt (connection.attempt);
auto attempt (connection->attempt);
// Process the block asynchronously from making the next network requests since this is a potentially long operation
// Hold a reference to the current attempt so we don't start another one while blocks are being processed.
attempt->node->background ([attempt, block] ()
@ -617,12 +619,12 @@ void rai::bulk_pull_client::received_block (boost::system::error_code const & ec
}
else
{
BOOST_LOG (connection.node->log) << "Error deserializing block received from pull request";
BOOST_LOG (connection->node->log) << "Error deserializing block received from pull request";
}
}
else
{
BOOST_LOG (connection.node->log) << boost::str (boost::format ("Error bulk receiving block: %1%") % ec.message ());
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Error bulk receiving block: %1%") % ec.message ());
}
}
@ -776,7 +778,8 @@ rai::bootstrap_attempt::bootstrap_attempt (std::shared_ptr <rai::node> node_a) :
connections (0),
pulling (0),
node (node_a),
state (rai::attempt_state::starting)
state (rai::attempt_state::starting),
account_count (0)
{
BOOST_LOG (node->log) << "Starting bootstrap attempt";
}
@ -789,7 +792,7 @@ rai::bootstrap_attempt::~bootstrap_attempt ()
void rai::bootstrap_attempt::populate_connections ()
{
if (++connections < node->config.bootstrap_connections)
if (connections < node->config.bootstrap_connections)
{
auto peer (node->peers.bootstrap_peer ());
if (peer != rai::endpoint ())
@ -797,14 +800,6 @@ void rai::bootstrap_attempt::populate_connections ()
auto client (std::make_shared <rai::bootstrap_client> (node, shared_from_this (), rai::tcp_endpoint (peer.address (), peer.port ())));
client->run ();
}
else
{
--connections;
}
}
else
{
--connections;
}
std::weak_ptr <rai::bootstrap_attempt> this_w (shared_from_this ());
switch (state)
@ -850,21 +845,6 @@ void rai::bootstrap_client::completed_frontier_request ()
work ();
}
void rai::bootstrap_client::completed_pull ()
{
if (pull_client.expected == pull_client.pull.end)
{
pull_client.pull = rai::pull_info ();
--attempt->pulling;
work ();
}
else
{
attempt->requeue_pull (pull_client.pull);
BOOST_LOG (node->log) << boost::str (boost::format ("Disconnecting from %1% because it didn't give us what we requested") % endpoint);
}
}
void rai::bootstrap_client::completed_pulls ()
{
BOOST_LOG (node->log) << "Completed pulls";
@ -914,7 +894,8 @@ void rai::bootstrap_client::work ()
attempt->pulls.pop_front ();
++attempt->pulling;
lock.unlock ();
pull_client.request (pull);
auto pull_client (std::make_shared <rai::bulk_pull_client> (shared_from_this ()));
pull_client->request (pull);
}
else
{

View file

@ -82,11 +82,12 @@ public:
rai::attempt_state state;
std::unordered_set <rai::endpoint> attempted;
std::mutex mutex;
std::atomic <unsigned> account_count;
};
class frontier_req_client : public std::enable_shared_from_this <rai::frontier_req_client>
{
public:
frontier_req_client (std::shared_ptr <rai::bootstrap_client> const &);
frontier_req_client (std::shared_ptr <rai::bootstrap_client>);
~frontier_req_client ();
void run ();
void receive_frontier ();
@ -100,18 +101,17 @@ public:
unsigned count;
std::chrono::system_clock::time_point next_report;
};
class bulk_pull_client
class bulk_pull_client : public std::enable_shared_from_this <rai::bulk_pull_client>
{
public:
bulk_pull_client (rai::bootstrap_client &);
bulk_pull_client (std::shared_ptr <rai::bootstrap_client>);
~bulk_pull_client ();
void request (rai::pull_info const &);
void receive_block ();
void received_type ();
void received_block (boost::system::error_code const &, size_t);
rai::block_hash first ();
rai::bootstrap_client & connection;
size_t account_count;
std::shared_ptr <rai::bootstrap_client> connection;
rai::block_hash expected;
rai::pull_info pull;
};
@ -126,7 +126,6 @@ public:
void poll ();
void completed_frontier_request ();
void sent_request (boost::system::error_code const &, size_t);
void completed_pull ();
void completed_pulls ();
void completed_pushes ();
std::shared_ptr <rai::bootstrap_client> shared ();
@ -136,7 +135,6 @@ public:
std::shared_ptr <rai::bootstrap_attempt> attempt;
boost::asio::ip::tcp::socket socket;
std::array <uint8_t, 200> receive_buffer;
rai::bulk_pull_client pull_client;
rai::tcp_endpoint endpoint;
boost::asio::deadline_timer timeout;
};