Reworking bootstrap work process.

This commit is contained in:
clemahieu 2017-07-03 00:19:44 -05:00
commit 77abcc9304
2 changed files with 145 additions and 209 deletions

View file

@ -151,7 +151,6 @@ rai::bootstrap_client::bootstrap_client (std::shared_ptr <rai::node> node_a, std
node (node_a),
attempt (attempt_a),
socket (node_a->network.service),
connected (false),
pull_client (*this),
endpoint (endpoint_a),
timeout (node_a->network.service)
@ -160,7 +159,13 @@ timeout (node_a->network.service)
rai::bootstrap_client::~bootstrap_client ()
{
attempt->connection_ending (this);
if (!pull_client.pull.account.is_zero ())
{
// If this connection is ending and request_account hasn't been cleared it didn't finish, requeue
attempt->requeue_pull (pull_client.pull);
--attempt->pulling;
}
--attempt->connections;
}
void rai::bootstrap_client::start_timeout ()
@ -174,10 +179,7 @@ void rai::bootstrap_client::start_timeout ()
auto this_l (this_w.lock ());
if (this_l != nullptr)
{
if (!this_l->connected)
{
this_l->socket.close ();
}
this_l->socket.close ();
}
}
});
@ -199,8 +201,7 @@ void rai::bootstrap_client::run ()
if (!ec)
{
BOOST_LOG (this_l->node->log) << boost::str (boost::format ("Connection established to %1%") % this_l->endpoint);
this_l->connected = true;
this_l->attempt->pool_connection (this_l);
this_l->work ();
}
else
{
@ -265,7 +266,8 @@ std::shared_ptr <rai::bootstrap_client> rai::bootstrap_client::shared ()
rai::frontier_req_client::frontier_req_client (std::shared_ptr <rai::bootstrap_client> const & connection_a) :
connection (connection_a),
current (0)
current (0),
count (0)
{
next ();
}
@ -319,6 +321,11 @@ void rai::frontier_req_client::received_frontier (boost::system::error_code cons
rai::bufferstream latest_stream (connection->receive_buffer.data () + sizeof (rai::uint256_union), sizeof (rai::uint256_union));
auto error2 (rai::read (latest_stream, latest));
assert (!error2);
++count;
if (count % 16384 == 0)
{
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Received %1% frontiers from %2%") % std::to_string (count) % connection->socket.remote_endpoint ());
}
if (!account.is_zero ())
{
while (!current.is_zero () && current < account)
@ -353,7 +360,16 @@ void rai::frontier_req_client::received_frontier (boost::system::error_code cons
else
{
// They know about a block we don't.
connection->attempt->pulls.push_back (rai::pull_info (account, latest, info.head));
rai::account landing ("059F68AAB29DE0D3A27443625C7EA9CDDB6517A8B76FE37727EF6A4D76832AD5");
rai::account faucet ("8E319CE6F3025E5B2DF66DA7AB1467FE48F1679C13DD43BFDB29FA2E9FC40D3B");
if (account != rai::genesis_account && account != landing && account != faucet)
{
connection->attempt->pulls.push_back (rai::pull_info (account, latest, info.head));
}
else
{
connection->attempt->pulls.push_front (rai::pull_info (account, latest, info.head));
}
}
}
next ();
@ -384,7 +400,7 @@ void rai::frontier_req_client::received_frontier (boost::system::error_code cons
next ();
}
}
connection->attempt->completed_requests (connection);
connection->completed_frontier_request ();
}
}
else
@ -514,7 +530,7 @@ void rai::bulk_pull_client::received_type ()
}
case rai::block_type::not_a_block:
{
connection.attempt->completed_pull (connection.shared ());
connection.completed_pull ();
break;
}
default:
@ -675,7 +691,7 @@ void rai::bulk_push_client::send_finished ()
auto this_l (shared_from_this ());
async_write (connection->socket, boost::asio::buffer (buffer->data (), 1), [this_l] (boost::system::error_code const & ec, size_t size_a)
{
this_l->connection->attempt->completed_pushes (this_l->connection);
this_l->connection->completed_pushes ();
});
}
@ -726,6 +742,8 @@ attempts (0)
}
rai::bootstrap_attempt::bootstrap_attempt (std::shared_ptr <rai::node> node_a) :
connections (0),
pulling (0),
node (node_a),
state (rai::attempt_state::starting)
{
@ -740,233 +758,154 @@ rai::bootstrap_attempt::~bootstrap_attempt ()
void rai::bootstrap_attempt::populate_connections ()
{
std::weak_ptr <rai::bootstrap_attempt> this_w (shared_from_this ());
std::shared_ptr <rai::bootstrap_client> client;
if (++connections < node->config.bootstrap_connections)
{
std::lock_guard <std::mutex> lock (mutex);
if (connecting.size () + active.size () + idle.size () < node->config.bootstrap_connections)
auto peer (node->peers.bootstrap_peer ());
if (peer != rai::endpoint ())
{
auto peer (node->peers.bootstrap_peer ());
if (peer != rai::endpoint ())
auto client (std::make_shared <rai::bootstrap_client> (node, shared_from_this (), rai::tcp_endpoint (peer.address (), peer.port ())));
client->run ();
}
else
{
--connections;
}
}
else
{
--connections;
}
std::weak_ptr <rai::bootstrap_attempt> this_w (shared_from_this ());
switch (state)
{
case rai::attempt_state::starting:
case rai::attempt_state::requesting_frontiers:
case rai::attempt_state::requesting_pulls:
node->alarm.add (std::chrono::system_clock::now () + std::chrono::seconds (5), [this_w] ()
{
client = start_connection (peer);
}
}
switch (state)
{
case rai::attempt_state::starting:
case rai::attempt_state::requesting_frontiers:
case rai::attempt_state::requesting_pulls:
node->alarm.add (std::chrono::system_clock::now () + std::chrono::seconds (1), [this_w] ()
if (auto this_l = this_w.lock ())
{
if (auto this_l = this_w.lock ())
{
this_l->populate_connections ();
}
});
break;
default:
break;
}
this_l->populate_connections ();
}
});
break;
default:
break;
}
}
void rai::bootstrap_attempt::add_connection (rai::endpoint const & endpoint_a)
{
std::shared_ptr <rai::bootstrap_client> client;
{
std::lock_guard <std::mutex> lock (mutex);
client = start_connection (endpoint_a);
}
}
std::shared_ptr <rai::bootstrap_client> rai::bootstrap_attempt::start_connection (rai::endpoint const & endpoint_a)
{
assert (!mutex.try_lock ());
std::shared_ptr <rai::bootstrap_client> client;
if (attempted.find (endpoint_a) == attempted.end ())
{
attempted.insert (endpoint_a);
auto node_l (node->shared ());
client = std::make_shared <rai::bootstrap_client> (node_l, shared_from_this (), rai::tcp_endpoint (endpoint_a.address (), endpoint_a.port ()));
connecting [client.get ()] = client;
client->run ();
}
return client;
auto client (std::make_shared <rai::bootstrap_client> (node, shared_from_this (), rai::tcp_endpoint (endpoint_a.address (), endpoint_a.port ())));
client->run ();
}
void rai::bootstrap_attempt::stop ()
{
std::lock_guard <std::mutex> lock (mutex);
state = rai::attempt_state::complete;
for (auto i: connecting)
{
auto attempt (i.second.lock ());
if (attempt != nullptr)
{
attempt->socket.close ();
}
}
for (auto i: active)
{
auto attempt (i.second.lock ());
if (attempt != nullptr)
{
attempt->socket.close ();
}
}
idle.clear ();
}
void rai::bootstrap_attempt::pool_connection (std::shared_ptr <rai::bootstrap_client> client_a)
void rai::bootstrap_client::completed_frontier_request ()
{
{
std::lock_guard <std::mutex> lock (mutex);
auto erased_active (active.erase (client_a.get ()));
auto erased_connecting (connecting.erase (client_a.get ()));
assert (erased_active == 1 || erased_connecting == 1);
idle.push_back (client_a);
std::lock_guard <std::mutex> lock (attempt->mutex);
if (node->config.logging.network_logging ())
{
BOOST_LOG (node->log) << boost::str (boost::format ("Completed frontier request, %1% out of sync accounts according to %2%") % attempt->pulls.size () % endpoint);
}
attempt->state = rai::attempt_state::requesting_pulls;
}
dispatch_work ();
work ();
}
void rai::bootstrap_attempt::connection_ending (rai::bootstrap_client * client_a)
void rai::bootstrap_client::completed_pull ()
{
if (client_a->node->network.on)
if (pull_client.expected == pull_client.pull.end)
{
std::lock_guard <std::mutex> lock (mutex);
if (!client_a->pull_client.pull.account.is_zero ())
{
// If this connection is ending and request_account hasn't been cleared it didn't finish, requeue
requeue_pull (client_a->pull_client.pull);
}
auto erased_connecting (connecting.erase (client_a));
auto erased_active (active.erase (client_a));
pull_client.pull = rai::pull_info ();
--attempt->pulling;
work ();
}
else
{
// If we're stopping, just exit
attempt->requeue_pull (pull_client.pull);
BOOST_LOG (node->log) << boost::str (boost::format ("Disconnecting from %1% because it didn't give us what we requested") % endpoint);
}
}
void rai::bootstrap_attempt::completed_requests (std::shared_ptr <rai::bootstrap_client> client_a)
{
{
std::lock_guard <std::mutex> lock (mutex);
if (node->config.logging.network_logging ())
{
BOOST_LOG (node->log) << boost::str (boost::format ("Completed frontier request, %1% out of sync accounts according to %2%") % pulls.size () % client_a->endpoint);
}
state = rai::attempt_state::requesting_pulls;
}
pool_connection (client_a);
}
void rai::bootstrap_attempt::completed_pull (std::shared_ptr <rai::bootstrap_client> client_a)
{
auto repool (true);
{
std::lock_guard <std::mutex> lock (mutex);
if (client_a->pull_client.expected != client_a->pull_client.pull.end)
{
requeue_pull (client_a->pull_client.pull);
BOOST_LOG (node->log) << boost::str (boost::format ("Disconnecting from %1% because it didn't give us what we requested") % client_a->endpoint);
repool = false;
}
client_a->pull_client.pull = rai::pull_info ();
}
if (repool)
{
pool_connection (client_a);
}
}
void rai::bootstrap_attempt::completed_pulls (std::shared_ptr <rai::bootstrap_client> client_a)
void rai::bootstrap_client::completed_pulls ()
{
BOOST_LOG (node->log) << "Completed pulls";
assert (node->bootstrap_initiator.in_progress ());
auto pushes (std::make_shared <rai::bulk_push_client> (client_a));
auto pushes (std::make_shared <rai::bulk_push_client> (shared_from_this ()));
pushes->start ();
}
void rai::bootstrap_attempt::completed_pushes (std::shared_ptr <rai::bootstrap_client> client_a)
void rai::bootstrap_client::completed_pushes ()
{
std::vector <std::shared_ptr <rai::bootstrap_client>> discard;
std::lock_guard <std::mutex> lock (mutex);
state = rai::attempt_state::complete;
discard.swap (idle);
std::lock_guard <std::mutex> lock (attempt->mutex);
attempt->state = rai::attempt_state::complete;
}
void rai::bootstrap_attempt::dispatch_work ()
void rai::bootstrap_client::poll ()
{
std::function <void ()> action;
auto this_l (shared_from_this ());
attempt->node->alarm.add (std::chrono::system_clock::now () + std::chrono::seconds (rai::rai_network == rai::rai_networks::rai_test_network ? 0 : 1), [this_l] ()
{
std::lock_guard <std::mutex> lock (mutex);
if (!idle.empty ())
{
// We have a connection we could do something with
auto connection (idle.back ());
idle.pop_back ();
switch (state)
this_l->work ();
});
}
void rai::bootstrap_client::work ()
{
auto poll_l (false);
std::unique_lock <std::mutex> lock (attempt->mutex);
switch (attempt->state)
{
case rai::attempt_state::starting:
attempt->state = rai::attempt_state::requesting_frontiers;
lock.unlock ();
if (this->node->config.logging.network_logging ())
{
case rai::attempt_state::starting:
state = rai::attempt_state::requesting_frontiers;
action = [connection, this] ()
{
if (this->node->config.logging.network_logging ())
{
BOOST_LOG (this->node->log) << boost::str (boost::format ("Initiating frontier request"));
}
connection->frontier_request ();
};
break;
case rai::attempt_state::requesting_frontiers:
break;
case rai::attempt_state::requesting_pulls:
if (!pulls.empty ())
{
// There are more things to pull
auto pull (pulls.back ());
pulls.pop_back ();
action = [connection, pull] ()
{
connection->pull_client.request (pull);
};
}
else
{
if (active.empty ())
{
// No one else is still running, we're done with pulls
state = rai::attempt_state::pushing;
action = [this, connection] ()
{
completed_pulls (connection);
};
}
else
{
// Drop this connection
break;
}
}
break;
case rai::attempt_state::pushing:
case rai::attempt_state::complete:
// Drop this connection
break;
};
if (action)
{
// If there's an action, move the connection from idle to active.
active [connection.get ()] = connection;
BOOST_LOG (this->node->log) << boost::str (boost::format ("Initiating frontier request"));
}
}
}
if (action)
frontier_request ();
break;
case rai::attempt_state::requesting_frontiers:
poll_l = true;
break;
case rai::attempt_state::requesting_pulls:
if (!attempt->pulls.empty ())
{
// There are more things to pull
auto pull (attempt->pulls.front ());
attempt->pulls.pop_front ();
++attempt->pulling;
lock.unlock ();
pull_client.request (pull);
}
else
{
if (attempt->pulling == 0)
{
attempt->state = rai::attempt_state::pushing;
lock.unlock ();
completed_pulls ();
}
else
{
poll_l = true;
}
}
break;
case rai::attempt_state::pushing:
case rai::attempt_state::complete:
break;
};
if (poll_l)
{
action ();
dispatch_work ();
poll ();
}
}
@ -975,6 +914,7 @@ void rai::bootstrap_attempt::requeue_pull (rai::pull_info const & pull_a)
auto pull (pull_a);
if (++pull.attempts < 16)
{
std::lock_guard <std::mutex> lock (mutex);
pulls.push_front (pull);
}
else

View file

@ -74,23 +74,13 @@ public:
void populate_connections ();
void add_connection (rai::endpoint const &);
void stop ();
void pool_connection (std::shared_ptr <rai::bootstrap_client>);
void connection_ending (rai::bootstrap_client *);
void completed_requests (std::shared_ptr <rai::bootstrap_client>);
void completed_pull (std::shared_ptr <rai::bootstrap_client>);
void completed_pulls (std::shared_ptr <rai::bootstrap_client>);
void completed_pushes (std::shared_ptr <rai::bootstrap_client>);
void dispatch_work ();
void requeue_pull (rai::pull_info const &);
std::deque <rai::pull_info> pulls;
std::unordered_map <rai::bootstrap_client *, std::weak_ptr <rai::bootstrap_client>> connecting;
std::unordered_map <rai::bootstrap_client *, std::weak_ptr <rai::bootstrap_client>> active;
std::vector <std::shared_ptr <rai::bootstrap_client>> idle;
std::atomic <unsigned> connections;
unsigned pulling;
std::shared_ptr <rai::node> node;
rai::attempt_state state;
std::unordered_set <rai::endpoint> attempted;
private:
std::shared_ptr <rai::bootstrap_client> start_connection (rai::endpoint const &);
std::mutex mutex;
};
class frontier_req_client : public std::enable_shared_from_this <rai::frontier_req_client>
@ -106,6 +96,7 @@ public:
std::shared_ptr <rai::bootstrap_client> connection;
rai::account current;
rai::account_info info;
unsigned count;
};
class bulk_pull_client
{
@ -129,7 +120,13 @@ public:
~bootstrap_client ();
void run ();
void frontier_request ();
void work ();
void poll ();
void completed_frontier_request ();
void sent_request (boost::system::error_code const &, size_t);
void completed_pull ();
void completed_pulls ();
void completed_pushes ();
std::shared_ptr <rai::bootstrap_client> shared ();
void start_timeout ();
void stop_timeout ();
@ -137,7 +134,6 @@ public:
std::shared_ptr <rai::bootstrap_attempt> attempt;
boost::asio::ip::tcp::socket socket;
std::array <uint8_t, 200> receive_buffer;
bool connected;
rai::bulk_pull_client pull_client;
rai::tcp_endpoint endpoint;
boost::asio::deadline_timer timeout;