diff --git a/rai/node/bootstrap.cpp b/rai/node/bootstrap.cpp index ed374e68..182e433f 100755 --- a/rai/node/bootstrap.cpp +++ b/rai/node/bootstrap.cpp @@ -151,20 +151,14 @@ rai::bootstrap_client::bootstrap_client (std::shared_ptr node_a, std node (node_a), attempt (attempt_a), socket (node_a->network.service), -pull_client (*this), endpoint (endpoint_a), timeout (node_a->network.service) { + ++attempt->connections; } rai::bootstrap_client::~bootstrap_client () { - if (!pull_client.pull.account.is_zero ()) - { - // If this connection is ending and request_account hasn't been cleared it didn't finish, requeue - attempt->requeue_pull (pull_client.pull); - --attempt->pulling; - } --attempt->connections; } @@ -264,7 +258,7 @@ std::shared_ptr rai::bootstrap_client::shared () return shared_from_this (); } -rai::frontier_req_client::frontier_req_client (std::shared_ptr const & connection_a) : +rai::frontier_req_client::frontier_req_client (std::shared_ptr connection_a) : connection (connection_a), current (0), count (0), @@ -443,9 +437,8 @@ void rai::frontier_req_client::next (MDB_txn * transaction_a) } } -rai::bulk_pull_client::bulk_pull_client (rai::bootstrap_client & connection_a) : -connection (connection_a), -account_count (0) +rai::bulk_pull_client::bulk_pull_client (std::shared_ptr connection_a) : +connection (connection_a) { } @@ -465,103 +458,112 @@ void rai::bulk_pull_client::request (rai::pull_info const & pull_a) rai::vectorstream stream (*buffer); req.serialize (stream); } - if (connection.node->config.logging.bulk_pull_logging ()) + if (connection->node->config.logging.bulk_pull_logging ()) { - BOOST_LOG (connection.node->log) << boost::str (boost::format ("Requesting account %1% down to %2% from %3%") % req.start.to_account () % req.end.to_string () % connection.endpoint); + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Requesting account %1% down to %2% from %3%") % req.start.to_account () % req.end.to_string () % connection->endpoint); } - else if (connection.node->config.logging.network_logging () && account_count % 256 == 0) + else if (connection->node->config.logging.network_logging () && connection->attempt->account_count++ % 256 == 0) { - BOOST_LOG (connection.node->log) << boost::str (boost::format ("Requesting account %1% down to %2% from %3%") % req.start.to_account () % req.end.to_string () % connection.endpoint); + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Requesting account %1% down to %2% from %3%") % req.start.to_account () % req.end.to_string () % connection->endpoint); } - ++account_count; - auto connection_l (connection.shared ()); - connection.start_timeout (); - boost::asio::async_write (connection.socket, boost::asio::buffer (buffer->data (), buffer->size ()), [connection_l, buffer] (boost::system::error_code const & ec, size_t size_a) + auto this_l (shared_from_this ()); + connection->start_timeout (); + boost::asio::async_write (connection->socket, boost::asio::buffer (buffer->data (), buffer->size ()), [this_l, buffer] (boost::system::error_code const & ec, size_t size_a) { - connection_l->stop_timeout (); + this_l->connection->stop_timeout (); if (!ec) { - connection_l->pull_client.receive_block (); + this_l->receive_block (); } else { - BOOST_LOG (connection_l->node->log) << boost::str (boost::format ("Error sending bulk pull request %1% to %2%") % ec.message () % connection_l->endpoint); + BOOST_LOG (this_l->connection->node->log) << boost::str (boost::format ("Error sending bulk pull request %1% to %2%") % ec.message () % this_l->connection->endpoint); } }); } void rai::bulk_pull_client::receive_block () { - auto connection_l (connection.shared ()); - connection.start_timeout (); - boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data (), 1), [connection_l] (boost::system::error_code const & ec, size_t size_a) + auto this_l (shared_from_this ()); + connection->start_timeout (); + boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data (), 1), [this_l] (boost::system::error_code const & ec, size_t size_a) { - connection_l->stop_timeout (); + this_l->connection->stop_timeout (); if (!ec) { - connection_l->pull_client.received_type (); + this_l->received_type (); } else { - BOOST_LOG (connection_l->node->log) << boost::str (boost::format ("Error receiving block type %1%") % ec.message ()); + BOOST_LOG (this_l->connection->node->log) << boost::str (boost::format ("Error receiving block type %1%") % ec.message ()); } }); } void rai::bulk_pull_client::received_type () { - auto connection_l (connection.shared ()); - rai::block_type type (static_cast (connection.receive_buffer [0])); + auto this_l (shared_from_this ()); + rai::block_type type (static_cast (connection->receive_buffer [0])); switch (type) { case rai::block_type::send: { - connection.start_timeout (); - boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::send_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a) + connection->start_timeout (); + boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data () + 1, rai::send_block::size), [this_l] (boost::system::error_code const & ec, size_t size_a) { - connection_l->stop_timeout (); - connection_l->pull_client.received_block (ec, size_a); + this_l->connection->stop_timeout (); + this_l->received_block (ec, size_a); }); break; } case rai::block_type::receive: { - connection.start_timeout (); - boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::receive_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a) + connection->start_timeout (); + boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data () + 1, rai::receive_block::size), [this_l] (boost::system::error_code const & ec, size_t size_a) { - connection_l->stop_timeout (); - connection_l->pull_client.received_block (ec, size_a); + this_l->connection->stop_timeout (); + this_l->received_block (ec, size_a); }); break; } case rai::block_type::open: { - connection.start_timeout (); - boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::open_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a) + connection->start_timeout (); + boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data () + 1, rai::open_block::size), [this_l] (boost::system::error_code const & ec, size_t size_a) { - connection_l->stop_timeout (); - connection_l->pull_client.received_block (ec, size_a); + this_l->connection->stop_timeout (); + this_l->received_block (ec, size_a); }); break; } case rai::block_type::change: { - connection.start_timeout (); - boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::change_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a) + connection->start_timeout (); + boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data () + 1, rai::change_block::size), [this_l] (boost::system::error_code const & ec, size_t size_a) { - connection_l->stop_timeout (); - connection_l->pull_client.received_block (ec, size_a); + this_l->connection->stop_timeout (); + this_l->received_block (ec, size_a); }); break; } case rai::block_type::not_a_block: { - connection.completed_pull (); + if (expected == pull.end) + { + pull = rai::pull_info (); + --connection->attempt->pulling; + connection->work (); + } + else + { + connection->attempt->requeue_pull (pull); + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Disconnecting from %1% because it didn't give us what we requested") % connection->endpoint); + } break; } default: { - BOOST_LOG (connection.node->log) << boost::str (boost::format ("Unknown type received as block type: %1%") % static_cast (type)); + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Unknown type received as block type: %1%") % static_cast (type)); break; } } @@ -571,22 +573,22 @@ void rai::bulk_pull_client::received_block (boost::system::error_code const & ec { if (!ec) { - rai::bufferstream stream (connection.receive_buffer.data (), 1 + size_a); + rai::bufferstream stream (connection->receive_buffer.data (), 1 + size_a); std::shared_ptr block (rai::deserialize_block (stream)); if (block != nullptr) { auto hash (block->hash ()); - if (connection.node->config.logging.bulk_pull_logging ()) + if (connection->node->config.logging.bulk_pull_logging ()) { std::string block_l; block->serialize_json (block_l); - BOOST_LOG (connection.node->log) << boost::str (boost::format ("Pulled block %1% %2%") % hash.to_string () % block_l); + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Pulled block %1% %2%") % hash.to_string () % block_l); } if (hash == expected) { expected = block->previous (); } - auto attempt (connection.attempt); + auto attempt (connection->attempt); // Process the block asynchronously from making the next network requests since this is a potentially long operation // Hold a reference to the current attempt so we don't start another one while blocks are being processed. attempt->node->background ([attempt, block] () @@ -617,12 +619,12 @@ void rai::bulk_pull_client::received_block (boost::system::error_code const & ec } else { - BOOST_LOG (connection.node->log) << "Error deserializing block received from pull request"; + BOOST_LOG (connection->node->log) << "Error deserializing block received from pull request"; } } else { - BOOST_LOG (connection.node->log) << boost::str (boost::format ("Error bulk receiving block: %1%") % ec.message ()); + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Error bulk receiving block: %1%") % ec.message ()); } } @@ -776,7 +778,8 @@ rai::bootstrap_attempt::bootstrap_attempt (std::shared_ptr node_a) : connections (0), pulling (0), node (node_a), -state (rai::attempt_state::starting) +state (rai::attempt_state::starting), +account_count (0) { BOOST_LOG (node->log) << "Starting bootstrap attempt"; } @@ -789,7 +792,7 @@ rai::bootstrap_attempt::~bootstrap_attempt () void rai::bootstrap_attempt::populate_connections () { - if (++connections < node->config.bootstrap_connections) + if (connections < node->config.bootstrap_connections) { auto peer (node->peers.bootstrap_peer ()); if (peer != rai::endpoint ()) @@ -797,14 +800,6 @@ void rai::bootstrap_attempt::populate_connections () auto client (std::make_shared (node, shared_from_this (), rai::tcp_endpoint (peer.address (), peer.port ()))); client->run (); } - else - { - --connections; - } - } - else - { - --connections; } std::weak_ptr this_w (shared_from_this ()); switch (state) @@ -850,21 +845,6 @@ void rai::bootstrap_client::completed_frontier_request () work (); } -void rai::bootstrap_client::completed_pull () -{ - if (pull_client.expected == pull_client.pull.end) - { - pull_client.pull = rai::pull_info (); - --attempt->pulling; - work (); - } - else - { - attempt->requeue_pull (pull_client.pull); - BOOST_LOG (node->log) << boost::str (boost::format ("Disconnecting from %1% because it didn't give us what we requested") % endpoint); - } -} - void rai::bootstrap_client::completed_pulls () { BOOST_LOG (node->log) << "Completed pulls"; @@ -914,7 +894,8 @@ void rai::bootstrap_client::work () attempt->pulls.pop_front (); ++attempt->pulling; lock.unlock (); - pull_client.request (pull); + auto pull_client (std::make_shared (shared_from_this ())); + pull_client->request (pull); } else { diff --git a/rai/node/bootstrap.hpp b/rai/node/bootstrap.hpp index 891ba3fd..6306662b 100644 --- a/rai/node/bootstrap.hpp +++ b/rai/node/bootstrap.hpp @@ -82,11 +82,12 @@ public: rai::attempt_state state; std::unordered_set attempted; std::mutex mutex; + std::atomic account_count; }; class frontier_req_client : public std::enable_shared_from_this { public: - frontier_req_client (std::shared_ptr const &); + frontier_req_client (std::shared_ptr ); ~frontier_req_client (); void run (); void receive_frontier (); @@ -100,18 +101,17 @@ public: unsigned count; std::chrono::system_clock::time_point next_report; }; -class bulk_pull_client +class bulk_pull_client : public std::enable_shared_from_this { public: - bulk_pull_client (rai::bootstrap_client &); + bulk_pull_client (std::shared_ptr ); ~bulk_pull_client (); void request (rai::pull_info const &); void receive_block (); void received_type (); void received_block (boost::system::error_code const &, size_t); rai::block_hash first (); - rai::bootstrap_client & connection; - size_t account_count; + std::shared_ptr connection; rai::block_hash expected; rai::pull_info pull; }; @@ -126,7 +126,6 @@ public: void poll (); void completed_frontier_request (); void sent_request (boost::system::error_code const &, size_t); - void completed_pull (); void completed_pulls (); void completed_pushes (); std::shared_ptr shared (); @@ -136,7 +135,6 @@ public: std::shared_ptr attempt; boost::asio::ip::tcp::socket socket; std::array receive_buffer; - rai::bulk_pull_client pull_client; rai::tcp_endpoint endpoint; boost::asio::deadline_timer timeout; };