From e45cd5744bc3f0f1a04b8f782f43f635be00fdd8 Mon Sep 17 00:00:00 2001 From: clemahieu Date: Tue, 29 Mar 2016 23:37:03 -0500 Subject: [PATCH] Attempting to connect to several IPs rather than waiting for timeout between each. --- rai/core_test/network.cpp | 2 +- rai/node/bootstrap.cpp | 109 +++++++++++++++++++++++++++----------- rai/node/bootstrap.hpp | 18 +++++-- 3 files changed, 92 insertions(+), 37 deletions(-) diff --git a/rai/core_test/network.cpp b/rai/core_test/network.cpp index 5ee19de2..62a7ea06 100644 --- a/rai/core_test/network.cpp +++ b/rai/core_test/network.cpp @@ -496,7 +496,7 @@ TEST (bootstrap_processor, DISABLED_process_none) TEST (bootstrap_processor, DISABLED_process_incomplete) { rai::system system (24000, 1); - auto node1 (std::make_shared (system.nodes [0])); + auto node1 (std::make_shared (system.nodes [0], nullptr)); rai::genesis genesis; auto frontier_req_client (std::make_shared (node1)); frontier_req_client->pulls [rai::test_genesis_key.pub] = genesis.hash (); diff --git a/rai/node/bootstrap.cpp b/rai/node/bootstrap.cpp index adc42117..02c2ddc5 100644 --- a/rai/node/bootstrap.cpp +++ b/rai/node/bootstrap.cpp @@ -159,10 +159,10 @@ std::unique_ptr rai::push_synchronization::retrieve (rai::transacti return store.block_get (transaction_a, hash_a); } -rai::bootstrap_client::bootstrap_client (std::shared_ptr node_a, std::function const & completion_action_a) : +rai::bootstrap_client::bootstrap_client (std::shared_ptr node_a, std::shared_ptr attempt_a) : node (node_a), -socket (node_a->network.service), -completion_action (completion_action_a) +attempt (attempt_a), +socket (node_a->network.service) { } @@ -172,7 +172,6 @@ rai::bootstrap_client::~bootstrap_client () { BOOST_LOG (node->log) << "Exiting bootstrap client"; } - completion_action (); } void rai::bootstrap_client::run (boost::asio::ip::tcp::endpoint const & endpoint_a) @@ -186,7 +185,18 @@ void rai::bootstrap_client::run (boost::asio::ip::tcp::endpoint const & endpoint { if (!ec) { - this_l->connect_action (); + { + std::lock_guard lock (this_l->node->bootstrap_initiator.mutex); + this_l->attempt->peers.clear (); + } + if (!this_l->attempt->connected.test_and_set ()) + { + this_l->connect_action (); + } + else + { + BOOST_LOG (this_l->node->log) << boost::str (boost::format ("Disconnecting from: %1% because bootstrap in progress") % endpoint_a); + } } else { @@ -762,6 +772,38 @@ void rai::bulk_push_client::push_block (rai::block const & block_a) }); } +rai::bootstrap_attempt::bootstrap_attempt (std::shared_ptr node_a, std::vector const & peers_a) : +node (node_a), +peers (peers_a) +{ + connected.clear (); +} + +rai::bootstrap_attempt::~bootstrap_attempt () +{ + std::lock_guard lock (node->bootstrap_initiator.mutex); + node->bootstrap_initiator.in_progress = false; + node->bootstrap_initiator.notify_listeners (); +} + +void rai::bootstrap_attempt::attempt () +{ + if (!peers.empty ()) + { + rai::endpoint endpoint (peers.back ()); + peers.pop_back (); + BOOST_LOG (node->log) << boost::str (boost::format ("Initiating bootstrap to: %1%") % endpoint); + auto node_l (node->shared ()); + auto this_l (shared_from_this ()); + auto processor (std::make_shared (node_l, this_l)); + processor->run (rai::tcp_endpoint (endpoint.address (), endpoint.port ())); + node->alarm.add (std::chrono::system_clock::now () + std::chrono::milliseconds (150), [this_l] () + { + this_l->attempt (); + }); + } +} + rai::bootstrap_initiator::bootstrap_initiator (rai::node & node_a) : node (node_a), in_progress (false), @@ -769,48 +811,51 @@ warmed_up (false) { } -void rai::bootstrap_initiator::warmup (rai::endpoint const & endpoint_a) +void rai::bootstrap_initiator::warmup (rai::endpoint const &) { - std::lock_guard lock (mutex); - if (!in_progress && warmed_up.size () < 2 && !in_progress && warmed_up.find (endpoint_a) == warmed_up.end ()) + auto do_warmup (false); { - warmed_up.insert (endpoint_a); - in_progress = true; - notify_listeners (); - initiate (endpoint_a); + std::lock_guard lock (mutex); + if (!in_progress && warmed_up < 3) + { + ++warmed_up; + do_warmup = true; + } + } + if (do_warmup) + { + bootstrap_any (); } } void rai::bootstrap_initiator::bootstrap (rai::endpoint const & endpoint_a) { - std::lock_guard lock (mutex); - if (!in_progress) - { - in_progress = true; - initiate (endpoint_a); - } + std::vector endpoints; + endpoints.push_back (endpoint_a); + begin_attempt (std::make_shared (node.shared (), endpoints)); } void rai::bootstrap_initiator::bootstrap_any () { - auto list (node.peers.bootstrap_candidates ()); - if (!list.empty ()) - { - bootstrap (list [random_pool.GenerateWord32 (0, list.size () - 1)].endpoint); - } + auto peers (node.peers.bootstrap_candidates ()); + std::vector endpoints; + for (auto &i: peers) + { + endpoints.push_back (i.endpoint); + } + std::random_shuffle (endpoints.begin (), endpoints.end ()); + begin_attempt (std::make_shared (node.shared (), endpoints)); } -void rai::bootstrap_initiator::initiate (rai::endpoint const & endpoint_a) +void rai::bootstrap_initiator::begin_attempt (std::shared_ptr attempt_a) { - BOOST_LOG (node.log) << boost::str (boost::format ("Initiating bootstrap to: %1%") % endpoint_a); - auto node_l (node.shared ()); - auto processor (std::make_shared (node_l, [node_l] () + std::lock_guard lock (mutex); + if (!in_progress) { - std::lock_guard lock (node_l->bootstrap_initiator.mutex); - node_l->bootstrap_initiator.in_progress = false; - node_l->bootstrap_initiator.notify_listeners (); - })); - processor->run (rai::tcp_endpoint (endpoint_a.address (), endpoint_a.port ())); + in_progress = true; + attempt_a->attempt (); + notify_listeners (); + } } void rai::bootstrap_initiator::add_observer (std::function const & observer_a) diff --git a/rai/node/bootstrap.hpp b/rai/node/bootstrap.hpp index 628e3c7c..08b9c599 100644 --- a/rai/node/bootstrap.hpp +++ b/rai/node/bootstrap.hpp @@ -44,17 +44,27 @@ public: std::unique_ptr retrieve (rai::transaction &, rai::block_hash const &) override; }; class node; +class bootstrap_attempt : public std::enable_shared_from_this +{ +public: + bootstrap_attempt (std::shared_ptr node_a, std::vector const & peers_a); + ~bootstrap_attempt (); + void attempt (); + std::shared_ptr node; + std::vector peers; + std::atomic_flag connected; +}; class bootstrap_client : public std::enable_shared_from_this { public: - bootstrap_client (std::shared_ptr , std::function const & = [] () {}); + bootstrap_client (std::shared_ptr , std::shared_ptr ); ~bootstrap_client (); void run (rai::tcp_endpoint const &); void connect_action (); void sent_request (boost::system::error_code const &, size_t); std::shared_ptr node; + std::shared_ptr attempt; boost::asio::ip::tcp::socket socket; - std::function completion_action; }; class frontier_req_client : public std::enable_shared_from_this { @@ -113,13 +123,13 @@ public: void warmup (rai::endpoint const &); void bootstrap (rai::endpoint const &); void bootstrap_any (); - void initiate (rai::endpoint const &); + void begin_attempt (std::shared_ptr ); void notify_listeners (); void add_observer (std::function const &); std::mutex mutex; rai::node & node; bool in_progress; - std::unordered_set warmed_up; + unsigned warmed_up; private: std::vector > observers; };