From 77abcc9304f01542b5a262af339c84989598b0af Mon Sep 17 00:00:00 2001 From: clemahieu Date: Mon, 3 Jul 2017 00:19:44 -0500 Subject: [PATCH] Reworking bootstrap work process. --- rai/node/bootstrap.cpp | 332 +++++++++++++++++------------------------ rai/node/bootstrap.hpp | 22 ++- 2 files changed, 145 insertions(+), 209 deletions(-) diff --git a/rai/node/bootstrap.cpp b/rai/node/bootstrap.cpp index dacf6a71..33178b96 100755 --- a/rai/node/bootstrap.cpp +++ b/rai/node/bootstrap.cpp @@ -151,7 +151,6 @@ rai::bootstrap_client::bootstrap_client (std::shared_ptr node_a, std node (node_a), attempt (attempt_a), socket (node_a->network.service), -connected (false), pull_client (*this), endpoint (endpoint_a), timeout (node_a->network.service) @@ -160,7 +159,13 @@ timeout (node_a->network.service) rai::bootstrap_client::~bootstrap_client () { - attempt->connection_ending (this); + if (!pull_client.pull.account.is_zero ()) + { + // If this connection is ending and request_account hasn't been cleared it didn't finish, requeue + attempt->requeue_pull (pull_client.pull); + --attempt->pulling; + } + --attempt->connections; } void rai::bootstrap_client::start_timeout () @@ -174,10 +179,7 @@ void rai::bootstrap_client::start_timeout () auto this_l (this_w.lock ()); if (this_l != nullptr) { - if (!this_l->connected) - { - this_l->socket.close (); - } + this_l->socket.close (); } } }); @@ -199,8 +201,7 @@ void rai::bootstrap_client::run () if (!ec) { BOOST_LOG (this_l->node->log) << boost::str (boost::format ("Connection established to %1%") % this_l->endpoint); - this_l->connected = true; - this_l->attempt->pool_connection (this_l); + this_l->work (); } else { @@ -265,7 +266,8 @@ std::shared_ptr rai::bootstrap_client::shared () rai::frontier_req_client::frontier_req_client (std::shared_ptr const & connection_a) : connection (connection_a), -current (0) +current (0), +count (0) { next (); } @@ -319,6 +321,11 @@ void rai::frontier_req_client::received_frontier (boost::system::error_code cons rai::bufferstream latest_stream (connection->receive_buffer.data () + sizeof (rai::uint256_union), sizeof (rai::uint256_union)); auto error2 (rai::read (latest_stream, latest)); assert (!error2); + ++count; + if (count % 16384 == 0) + { + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Received %1% frontiers from %2%") % std::to_string (count) % connection->socket.remote_endpoint ()); + } if (!account.is_zero ()) { while (!current.is_zero () && current < account) @@ -353,7 +360,16 @@ void rai::frontier_req_client::received_frontier (boost::system::error_code cons else { // They know about a block we don't. - connection->attempt->pulls.push_back (rai::pull_info (account, latest, info.head)); + rai::account landing ("059F68AAB29DE0D3A27443625C7EA9CDDB6517A8B76FE37727EF6A4D76832AD5"); + rai::account faucet ("8E319CE6F3025E5B2DF66DA7AB1467FE48F1679C13DD43BFDB29FA2E9FC40D3B"); + if (account != rai::genesis_account && account != landing && account != faucet) + { + connection->attempt->pulls.push_back (rai::pull_info (account, latest, info.head)); + } + else + { + connection->attempt->pulls.push_front (rai::pull_info (account, latest, info.head)); + } } } next (); @@ -384,7 +400,7 @@ void rai::frontier_req_client::received_frontier (boost::system::error_code cons next (); } } - connection->attempt->completed_requests (connection); + connection->completed_frontier_request (); } } else @@ -514,7 +530,7 @@ void rai::bulk_pull_client::received_type () } case rai::block_type::not_a_block: { - connection.attempt->completed_pull (connection.shared ()); + connection.completed_pull (); break; } default: @@ -675,7 +691,7 @@ void rai::bulk_push_client::send_finished () auto this_l (shared_from_this ()); async_write (connection->socket, boost::asio::buffer (buffer->data (), 1), [this_l] (boost::system::error_code const & ec, size_t size_a) { - this_l->connection->attempt->completed_pushes (this_l->connection); + this_l->connection->completed_pushes (); }); } @@ -726,6 +742,8 @@ attempts (0) } rai::bootstrap_attempt::bootstrap_attempt (std::shared_ptr node_a) : +connections (0), +pulling (0), node (node_a), state (rai::attempt_state::starting) { @@ -740,233 +758,154 @@ rai::bootstrap_attempt::~bootstrap_attempt () void rai::bootstrap_attempt::populate_connections () { - std::weak_ptr this_w (shared_from_this ()); - std::shared_ptr client; + if (++connections < node->config.bootstrap_connections) { - std::lock_guard lock (mutex); - if (connecting.size () + active.size () + idle.size () < node->config.bootstrap_connections) + auto peer (node->peers.bootstrap_peer ()); + if (peer != rai::endpoint ()) { - auto peer (node->peers.bootstrap_peer ()); - if (peer != rai::endpoint ()) + auto client (std::make_shared (node, shared_from_this (), rai::tcp_endpoint (peer.address (), peer.port ()))); + client->run (); + } + else + { + --connections; + } + } + else + { + --connections; + } + std::weak_ptr this_w (shared_from_this ()); + switch (state) + { + case rai::attempt_state::starting: + case rai::attempt_state::requesting_frontiers: + case rai::attempt_state::requesting_pulls: + node->alarm.add (std::chrono::system_clock::now () + std::chrono::seconds (5), [this_w] () { - client = start_connection (peer); - } - } - switch (state) - { - case rai::attempt_state::starting: - case rai::attempt_state::requesting_frontiers: - case rai::attempt_state::requesting_pulls: - node->alarm.add (std::chrono::system_clock::now () + std::chrono::seconds (1), [this_w] () + if (auto this_l = this_w.lock ()) { - if (auto this_l = this_w.lock ()) - { - this_l->populate_connections (); - } - }); - break; - default: - break; - } + this_l->populate_connections (); + } + }); + break; + default: + break; } } void rai::bootstrap_attempt::add_connection (rai::endpoint const & endpoint_a) { - std::shared_ptr client; - { - std::lock_guard lock (mutex); - client = start_connection (endpoint_a); - } -} - -std::shared_ptr rai::bootstrap_attempt::start_connection (rai::endpoint const & endpoint_a) -{ - assert (!mutex.try_lock ()); - std::shared_ptr client; - if (attempted.find (endpoint_a) == attempted.end ()) - { - attempted.insert (endpoint_a); - auto node_l (node->shared ()); - client = std::make_shared (node_l, shared_from_this (), rai::tcp_endpoint (endpoint_a.address (), endpoint_a.port ())); - connecting [client.get ()] = client; - client->run (); - } - return client; + auto client (std::make_shared (node, shared_from_this (), rai::tcp_endpoint (endpoint_a.address (), endpoint_a.port ()))); + client->run (); } void rai::bootstrap_attempt::stop () { std::lock_guard lock (mutex); state = rai::attempt_state::complete; - for (auto i: connecting) - { - auto attempt (i.second.lock ()); - if (attempt != nullptr) - { - attempt->socket.close (); - } - } - for (auto i: active) - { - auto attempt (i.second.lock ()); - if (attempt != nullptr) - { - attempt->socket.close (); - } - } - idle.clear (); } -void rai::bootstrap_attempt::pool_connection (std::shared_ptr client_a) +void rai::bootstrap_client::completed_frontier_request () { { - std::lock_guard lock (mutex); - auto erased_active (active.erase (client_a.get ())); - auto erased_connecting (connecting.erase (client_a.get ())); - assert (erased_active == 1 || erased_connecting == 1); - idle.push_back (client_a); + std::lock_guard lock (attempt->mutex); + if (node->config.logging.network_logging ()) + { + BOOST_LOG (node->log) << boost::str (boost::format ("Completed frontier request, %1% out of sync accounts according to %2%") % attempt->pulls.size () % endpoint); + } + attempt->state = rai::attempt_state::requesting_pulls; } - dispatch_work (); + work (); } -void rai::bootstrap_attempt::connection_ending (rai::bootstrap_client * client_a) +void rai::bootstrap_client::completed_pull () { - if (client_a->node->network.on) + if (pull_client.expected == pull_client.pull.end) { - std::lock_guard lock (mutex); - if (!client_a->pull_client.pull.account.is_zero ()) - { - // If this connection is ending and request_account hasn't been cleared it didn't finish, requeue - requeue_pull (client_a->pull_client.pull); - } - auto erased_connecting (connecting.erase (client_a)); - auto erased_active (active.erase (client_a)); + pull_client.pull = rai::pull_info (); + --attempt->pulling; + work (); } else { - // If we're stopping, just exit + attempt->requeue_pull (pull_client.pull); + BOOST_LOG (node->log) << boost::str (boost::format ("Disconnecting from %1% because it didn't give us what we requested") % endpoint); } } -void rai::bootstrap_attempt::completed_requests (std::shared_ptr client_a) -{ - { - std::lock_guard lock (mutex); - if (node->config.logging.network_logging ()) - { - BOOST_LOG (node->log) << boost::str (boost::format ("Completed frontier request, %1% out of sync accounts according to %2%") % pulls.size () % client_a->endpoint); - } - state = rai::attempt_state::requesting_pulls; - } - pool_connection (client_a); -} - -void rai::bootstrap_attempt::completed_pull (std::shared_ptr client_a) -{ - auto repool (true); - { - std::lock_guard lock (mutex); - if (client_a->pull_client.expected != client_a->pull_client.pull.end) - { - requeue_pull (client_a->pull_client.pull); - BOOST_LOG (node->log) << boost::str (boost::format ("Disconnecting from %1% because it didn't give us what we requested") % client_a->endpoint); - repool = false; - } - client_a->pull_client.pull = rai::pull_info (); - } - if (repool) - { - pool_connection (client_a); - } -} - -void rai::bootstrap_attempt::completed_pulls (std::shared_ptr client_a) +void rai::bootstrap_client::completed_pulls () { BOOST_LOG (node->log) << "Completed pulls"; assert (node->bootstrap_initiator.in_progress ()); - auto pushes (std::make_shared (client_a)); + auto pushes (std::make_shared (shared_from_this ())); pushes->start (); } -void rai::bootstrap_attempt::completed_pushes (std::shared_ptr client_a) +void rai::bootstrap_client::completed_pushes () { - std::vector > discard; - std::lock_guard lock (mutex); - state = rai::attempt_state::complete; - discard.swap (idle); + std::lock_guard lock (attempt->mutex); + attempt->state = rai::attempt_state::complete; } -void rai::bootstrap_attempt::dispatch_work () +void rai::bootstrap_client::poll () { - std::function action; + auto this_l (shared_from_this ()); + attempt->node->alarm.add (std::chrono::system_clock::now () + std::chrono::seconds (rai::rai_network == rai::rai_networks::rai_test_network ? 0 : 1), [this_l] () { - std::lock_guard lock (mutex); - if (!idle.empty ()) - { - // We have a connection we could do something with - auto connection (idle.back ()); - idle.pop_back (); - switch (state) + this_l->work (); + }); +} + +void rai::bootstrap_client::work () +{ + auto poll_l (false); + std::unique_lock lock (attempt->mutex); + switch (attempt->state) + { + case rai::attempt_state::starting: + attempt->state = rai::attempt_state::requesting_frontiers; + lock.unlock (); + if (this->node->config.logging.network_logging ()) { - case rai::attempt_state::starting: - state = rai::attempt_state::requesting_frontiers; - action = [connection, this] () - { - if (this->node->config.logging.network_logging ()) - { - BOOST_LOG (this->node->log) << boost::str (boost::format ("Initiating frontier request")); - } - connection->frontier_request (); - }; - break; - case rai::attempt_state::requesting_frontiers: - break; - case rai::attempt_state::requesting_pulls: - if (!pulls.empty ()) - { - // There are more things to pull - auto pull (pulls.back ()); - pulls.pop_back (); - action = [connection, pull] () - { - connection->pull_client.request (pull); - }; - } - else - { - if (active.empty ()) - { - // No one else is still running, we're done with pulls - state = rai::attempt_state::pushing; - action = [this, connection] () - { - completed_pulls (connection); - }; - } - else - { - // Drop this connection - break; - } - } - break; - case rai::attempt_state::pushing: - case rai::attempt_state::complete: - // Drop this connection - break; - }; - if (action) - { - // If there's an action, move the connection from idle to active. - active [connection.get ()] = connection; + BOOST_LOG (this->node->log) << boost::str (boost::format ("Initiating frontier request")); } - } - } - if (action) + frontier_request (); + break; + case rai::attempt_state::requesting_frontiers: + poll_l = true; + break; + case rai::attempt_state::requesting_pulls: + if (!attempt->pulls.empty ()) + { + // There are more things to pull + auto pull (attempt->pulls.front ()); + attempt->pulls.pop_front (); + ++attempt->pulling; + lock.unlock (); + pull_client.request (pull); + } + else + { + if (attempt->pulling == 0) + { + attempt->state = rai::attempt_state::pushing; + lock.unlock (); + completed_pulls (); + } + else + { + poll_l = true; + } + } + break; + case rai::attempt_state::pushing: + case rai::attempt_state::complete: + break; + }; + if (poll_l) { - action (); - dispatch_work (); + poll (); } } @@ -975,6 +914,7 @@ void rai::bootstrap_attempt::requeue_pull (rai::pull_info const & pull_a) auto pull (pull_a); if (++pull.attempts < 16) { + std::lock_guard lock (mutex); pulls.push_front (pull); } else diff --git a/rai/node/bootstrap.hpp b/rai/node/bootstrap.hpp index 20dbfcc4..63086cee 100644 --- a/rai/node/bootstrap.hpp +++ b/rai/node/bootstrap.hpp @@ -74,23 +74,13 @@ public: void populate_connections (); void add_connection (rai::endpoint const &); void stop (); - void pool_connection (std::shared_ptr ); - void connection_ending (rai::bootstrap_client *); - void completed_requests (std::shared_ptr ); - void completed_pull (std::shared_ptr ); - void completed_pulls (std::shared_ptr ); - void completed_pushes (std::shared_ptr ); - void dispatch_work (); void requeue_pull (rai::pull_info const &); std::deque pulls; - std::unordered_map > connecting; - std::unordered_map > active; - std::vector > idle; + std::atomic connections; + unsigned pulling; std::shared_ptr node; rai::attempt_state state; std::unordered_set attempted; -private: - std::shared_ptr start_connection (rai::endpoint const &); std::mutex mutex; }; class frontier_req_client : public std::enable_shared_from_this @@ -106,6 +96,7 @@ public: std::shared_ptr connection; rai::account current; rai::account_info info; + unsigned count; }; class bulk_pull_client { @@ -129,7 +120,13 @@ public: ~bootstrap_client (); void run (); void frontier_request (); + void work (); + void poll (); + void completed_frontier_request (); void sent_request (boost::system::error_code const &, size_t); + void completed_pull (); + void completed_pulls (); + void completed_pushes (); std::shared_ptr shared (); void start_timeout (); void stop_timeout (); @@ -137,7 +134,6 @@ public: std::shared_ptr attempt; boost::asio::ip::tcp::socket socket; std::array receive_buffer; - bool connected; rai::bulk_pull_client pull_client; rai::tcp_endpoint endpoint; boost::asio::deadline_timer timeout;