Attempting to connect to several IPs rather than waiting for timeout between each.

This commit is contained in:
clemahieu 2016-03-29 23:37:03 -05:00
commit e45cd5744b
3 changed files with 92 additions and 37 deletions

View file

@ -496,7 +496,7 @@ TEST (bootstrap_processor, DISABLED_process_none)
TEST (bootstrap_processor, DISABLED_process_incomplete)
{
rai::system system (24000, 1);
auto node1 (std::make_shared <rai::bootstrap_client> (system.nodes [0]));
auto node1 (std::make_shared <rai::bootstrap_client> (system.nodes [0], nullptr));
rai::genesis genesis;
auto frontier_req_client (std::make_shared <rai::frontier_req_client> (node1));
frontier_req_client->pulls [rai::test_genesis_key.pub] = genesis.hash ();

View file

@ -159,10 +159,10 @@ std::unique_ptr <rai::block> rai::push_synchronization::retrieve (rai::transacti
return store.block_get (transaction_a, hash_a);
}
rai::bootstrap_client::bootstrap_client (std::shared_ptr <rai::node> node_a, std::function <void ()> const & completion_action_a) :
rai::bootstrap_client::bootstrap_client (std::shared_ptr <rai::node> node_a, std::shared_ptr <rai::bootstrap_attempt> attempt_a) :
node (node_a),
socket (node_a->network.service),
completion_action (completion_action_a)
attempt (attempt_a),
socket (node_a->network.service)
{
}
@ -172,7 +172,6 @@ rai::bootstrap_client::~bootstrap_client ()
{
BOOST_LOG (node->log) << "Exiting bootstrap client";
}
completion_action ();
}
void rai::bootstrap_client::run (boost::asio::ip::tcp::endpoint const & endpoint_a)
@ -186,7 +185,18 @@ void rai::bootstrap_client::run (boost::asio::ip::tcp::endpoint const & endpoint
{
if (!ec)
{
this_l->connect_action ();
{
std::lock_guard <std::mutex> lock (this_l->node->bootstrap_initiator.mutex);
this_l->attempt->peers.clear ();
}
if (!this_l->attempt->connected.test_and_set ())
{
this_l->connect_action ();
}
else
{
BOOST_LOG (this_l->node->log) << boost::str (boost::format ("Disconnecting from: %1% because bootstrap in progress") % endpoint_a);
}
}
else
{
@ -762,6 +772,38 @@ void rai::bulk_push_client::push_block (rai::block const & block_a)
});
}
rai::bootstrap_attempt::bootstrap_attempt (std::shared_ptr <rai::node> node_a, std::vector <rai::endpoint> const & peers_a) :
node (node_a),
peers (peers_a)
{
connected.clear ();
}
rai::bootstrap_attempt::~bootstrap_attempt ()
{
std::lock_guard <std::mutex> lock (node->bootstrap_initiator.mutex);
node->bootstrap_initiator.in_progress = false;
node->bootstrap_initiator.notify_listeners ();
}
void rai::bootstrap_attempt::attempt ()
{
if (!peers.empty ())
{
rai::endpoint endpoint (peers.back ());
peers.pop_back ();
BOOST_LOG (node->log) << boost::str (boost::format ("Initiating bootstrap to: %1%") % endpoint);
auto node_l (node->shared ());
auto this_l (shared_from_this ());
auto processor (std::make_shared <rai::bootstrap_client> (node_l, this_l));
processor->run (rai::tcp_endpoint (endpoint.address (), endpoint.port ()));
node->alarm.add (std::chrono::system_clock::now () + std::chrono::milliseconds (150), [this_l] ()
{
this_l->attempt ();
});
}
}
rai::bootstrap_initiator::bootstrap_initiator (rai::node & node_a) :
node (node_a),
in_progress (false),
@ -769,48 +811,51 @@ warmed_up (false)
{
}
void rai::bootstrap_initiator::warmup (rai::endpoint const & endpoint_a)
void rai::bootstrap_initiator::warmup (rai::endpoint const &)
{
std::lock_guard <std::mutex> lock (mutex);
if (!in_progress && warmed_up.size () < 2 && !in_progress && warmed_up.find (endpoint_a) == warmed_up.end ())
auto do_warmup (false);
{
warmed_up.insert (endpoint_a);
in_progress = true;
notify_listeners ();
initiate (endpoint_a);
std::lock_guard <std::mutex> lock (mutex);
if (!in_progress && warmed_up < 3)
{
++warmed_up;
do_warmup = true;
}
}
if (do_warmup)
{
bootstrap_any ();
}
}
void rai::bootstrap_initiator::bootstrap (rai::endpoint const & endpoint_a)
{
std::lock_guard <std::mutex> lock (mutex);
if (!in_progress)
{
in_progress = true;
initiate (endpoint_a);
}
std::vector <rai::endpoint> endpoints;
endpoints.push_back (endpoint_a);
begin_attempt (std::make_shared <rai::bootstrap_attempt> (node.shared (), endpoints));
}
void rai::bootstrap_initiator::bootstrap_any ()
{
auto list (node.peers.bootstrap_candidates ());
if (!list.empty ())
{
bootstrap (list [random_pool.GenerateWord32 (0, list.size () - 1)].endpoint);
}
auto peers (node.peers.bootstrap_candidates ());
std::vector <rai::endpoint> endpoints;
for (auto &i: peers)
{
endpoints.push_back (i.endpoint);
}
std::random_shuffle (endpoints.begin (), endpoints.end ());
begin_attempt (std::make_shared <bootstrap_attempt> (node.shared (), endpoints));
}
void rai::bootstrap_initiator::initiate (rai::endpoint const & endpoint_a)
void rai::bootstrap_initiator::begin_attempt (std::shared_ptr <rai::bootstrap_attempt> attempt_a)
{
BOOST_LOG (node.log) << boost::str (boost::format ("Initiating bootstrap to: %1%") % endpoint_a);
auto node_l (node.shared ());
auto processor (std::make_shared <rai::bootstrap_client> (node_l, [node_l] ()
std::lock_guard <std::mutex> lock (mutex);
if (!in_progress)
{
std::lock_guard <std::mutex> lock (node_l->bootstrap_initiator.mutex);
node_l->bootstrap_initiator.in_progress = false;
node_l->bootstrap_initiator.notify_listeners ();
}));
processor->run (rai::tcp_endpoint (endpoint_a.address (), endpoint_a.port ()));
in_progress = true;
attempt_a->attempt ();
notify_listeners ();
}
}
void rai::bootstrap_initiator::add_observer (std::function <void (bool)> const & observer_a)

View file

@ -44,17 +44,27 @@ public:
std::unique_ptr <rai::block> retrieve (rai::transaction &, rai::block_hash const &) override;
};
class node;
class bootstrap_attempt : public std::enable_shared_from_this <bootstrap_attempt>
{
public:
bootstrap_attempt (std::shared_ptr <rai::node> node_a, std::vector <rai::endpoint> const & peers_a);
~bootstrap_attempt ();
void attempt ();
std::shared_ptr <rai::node> node;
std::vector <rai::endpoint> peers;
std::atomic_flag connected;
};
class bootstrap_client : public std::enable_shared_from_this <bootstrap_client>
{
public:
bootstrap_client (std::shared_ptr <rai::node>, std::function <void ()> const & = [] () {});
bootstrap_client (std::shared_ptr <rai::node>, std::shared_ptr <rai::bootstrap_attempt>);
~bootstrap_client ();
void run (rai::tcp_endpoint const &);
void connect_action ();
void sent_request (boost::system::error_code const &, size_t);
std::shared_ptr <rai::node> node;
std::shared_ptr <rai::bootstrap_attempt> attempt;
boost::asio::ip::tcp::socket socket;
std::function <void ()> completion_action;
};
class frontier_req_client : public std::enable_shared_from_this <rai::frontier_req_client>
{
@ -113,13 +123,13 @@ public:
void warmup (rai::endpoint const &);
void bootstrap (rai::endpoint const &);
void bootstrap_any ();
void initiate (rai::endpoint const &);
void begin_attempt (std::shared_ptr <rai::bootstrap_attempt>);
void notify_listeners ();
void add_observer (std::function <void (bool)> const &);
std::mutex mutex;
rai::node & node;
bool in_progress;
std::unordered_set <rai::endpoint> warmed_up;
unsigned warmed_up;
private:
std::vector <std::function <void (bool)>> observers;
};