Avoid long read transactions for bootstrap (#2513)

* Avoid long transactions holding for bootstrap clients: frontier_req_client, bulk_push_client. Also avoid direct transactions usage if possible (use wrapper functions)
* Use nano::transaction::refresh () function to not open read transactions for too long
This commit is contained in:
Sergey Kroshnin 2020-01-27 17:22:29 +03:00 committed by GitHub
commit f8888f6513
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 39 additions and 23 deletions

View file

@ -884,6 +884,7 @@ void nano::bootstrap_attempt::lazy_pull_flush ()
batch_count = std::max (node->network_params.bootstrap.lazy_min_pull_blocks, batch_count_min); batch_count = std::max (node->network_params.bootstrap.lazy_min_pull_blocks, batch_count_min);
} }
} }
uint64_t read_count (0);
size_t count (0); size_t count (0);
auto transaction (node->store.tx_begin_read ()); auto transaction (node->store.tx_begin_read ());
while (!lazy_pulls.empty () && count < max_pulls) while (!lazy_pulls.empty () && count < max_pulls)
@ -896,6 +897,12 @@ void nano::bootstrap_attempt::lazy_pull_flush ()
++count; ++count;
} }
lazy_pulls.pop_front (); lazy_pulls.pop_front ();
// We don't want to open read transactions for too long
++read_count;
if (read_count % batch_read_size == 0)
{
transaction.refresh ();
}
} }
} }
} }
@ -907,6 +914,7 @@ bool nano::bootstrap_attempt::lazy_finished ()
return true; return true;
} }
bool result (true); bool result (true);
uint64_t read_count (0);
auto transaction (node->store.tx_begin_read ()); auto transaction (node->store.tx_begin_read ());
nano::lock_guard<std::mutex> lazy_lock (lazy_mutex); nano::lock_guard<std::mutex> lazy_lock (lazy_mutex);
for (auto it (lazy_keys.begin ()), end (lazy_keys.end ()); it != end && !stopped;) for (auto it (lazy_keys.begin ()), end (lazy_keys.end ()); it != end && !stopped;)
@ -921,6 +929,12 @@ bool nano::bootstrap_attempt::lazy_finished ()
break; break;
// No need to increment `it` as we break above. // No need to increment `it` as we break above.
} }
// We don't want to open read transactions for too long
++read_count;
if (read_count % batch_read_size == 0)
{
transaction.refresh ();
}
} }
// Finish lazy bootstrap without lazy pulls (in combination with still_pulling ()) // Finish lazy bootstrap without lazy pulls (in combination with still_pulling ())
if (!result && lazy_pulls.empty () && lazy_state_backlog.empty ()) if (!result && lazy_pulls.empty () && lazy_state_backlog.empty ())
@ -1201,6 +1215,7 @@ void nano::bootstrap_attempt::lazy_block_state_backlog_check (std::shared_ptr<na
void nano::bootstrap_attempt::lazy_backlog_cleanup () void nano::bootstrap_attempt::lazy_backlog_cleanup ()
{ {
uint64_t read_count (0);
auto transaction (node->store.tx_begin_read ()); auto transaction (node->store.tx_begin_read ());
nano::lock_guard<std::mutex> lazy_lock (lazy_mutex); nano::lock_guard<std::mutex> lazy_lock (lazy_mutex);
for (auto it (lazy_state_backlog.begin ()), end (lazy_state_backlog.end ()); it != end && !stopped;) for (auto it (lazy_state_backlog.begin ()), end (lazy_state_backlog.end ()); it != end && !stopped;)
@ -1223,6 +1238,12 @@ void nano::bootstrap_attempt::lazy_backlog_cleanup ()
lazy_add (it->first, it->second.retry_limit); lazy_add (it->first, it->second.retry_limit);
++it; ++it;
} }
// We don't want to open read transactions for too long
++read_count;
if (read_count % batch_read_size == 0)
{
transaction.refresh ();
}
} }
} }

View file

@ -164,6 +164,8 @@ public:
std::mutex lazy_mutex; std::mutex lazy_mutex;
// Wallet lazy bootstrap // Wallet lazy bootstrap
std::deque<nano::account> wallet_accounts; std::deque<nano::account> wallet_accounts;
/** The maximum number of records to be read in while iterating over long lazy containers */
static uint64_t constexpr batch_read_size = 256;
}; };
class bootstrap_client final : public std::enable_shared_from_this<bootstrap_client> class bootstrap_client final : public std::enable_shared_from_this<bootstrap_client>
{ {

View file

@ -366,8 +366,7 @@ void nano::bulk_pull_account_client::receive_pending ()
{ {
if (!pending.is_zero ()) if (!pending.is_zero ())
{ {
auto transaction (this_l->connection->node->store.tx_begin_read ()); if (!this_l->connection->node->ledger.block_exists (pending))
if (!this_l->connection->node->store.block_exists (transaction, pending))
{ {
this_l->connection->attempt->lazy_start (pending); this_l->connection->attempt->lazy_start (pending);
} }
@ -552,8 +551,7 @@ std::shared_ptr<nano::block> nano::bulk_pull_server::get_next ()
if (send_current) if (send_current)
{ {
auto transaction (connection->node->store.tx_begin_read ()); result = connection->node->block (current);
result = connection->node->store.block_get (transaction, current);
if (result != nullptr && set_current_to_end == false) if (result != nullptr && set_current_to_end == false)
{ {
auto previous (result->previous ()); auto previous (result->previous ());

View file

@ -20,10 +20,9 @@ void nano::bulk_push_client::start ()
auto this_l (shared_from_this ()); auto this_l (shared_from_this ());
connection->channel->send ( connection->channel->send (
message, [this_l](boost::system::error_code const & ec, size_t size_a) { message, [this_l](boost::system::error_code const & ec, size_t size_a) {
auto transaction (this_l->connection->node->store.tx_begin_read ());
if (!ec) if (!ec)
{ {
this_l->push (transaction); this_l->push ();
} }
else else
{ {
@ -36,7 +35,7 @@ void nano::bulk_push_client::start ()
false); // is bootstrap traffic is_droppable false false); // is bootstrap traffic is_droppable false
} }
void nano::bulk_push_client::push (nano::transaction const & transaction_a) void nano::bulk_push_client::push ()
{ {
std::shared_ptr<nano::block> block; std::shared_ptr<nano::block> block;
bool finished (false); bool finished (false);
@ -57,7 +56,7 @@ void nano::bulk_push_client::push (nano::transaction const & transaction_a)
} }
if (!finished) if (!finished)
{ {
block = connection->node->store.block_get (transaction_a, current_target.first); block = connection->node->block (current_target.first);
if (block == nullptr) if (block == nullptr)
{ {
current_target.first = nano::block_hash (0); current_target.first = nano::block_hash (0);
@ -108,8 +107,7 @@ void nano::bulk_push_client::push_block (nano::block const & block_a)
connection->channel->send_buffer (nano::shared_const_buffer (std::move (buffer)), nano::stat::detail::all, [this_l](boost::system::error_code const & ec, size_t size_a) { connection->channel->send_buffer (nano::shared_const_buffer (std::move (buffer)), nano::stat::detail::all, [this_l](boost::system::error_code const & ec, size_t size_a) {
if (!ec) if (!ec)
{ {
auto transaction (this_l->connection->node->store.tx_begin_read ()); this_l->push ();
this_l->push (transaction);
} }
else else
{ {

View file

@ -6,7 +6,6 @@
namespace nano namespace nano
{ {
class transaction;
class bootstrap_client; class bootstrap_client;
class bulk_push_client final : public std::enable_shared_from_this<nano::bulk_push_client> class bulk_push_client final : public std::enable_shared_from_this<nano::bulk_push_client>
{ {
@ -14,7 +13,7 @@ public:
explicit bulk_push_client (std::shared_ptr<nano::bootstrap_client> const &); explicit bulk_push_client (std::shared_ptr<nano::bootstrap_client> const &);
~bulk_push_client (); ~bulk_push_client ();
void start (); void start ();
void push (nano::transaction const &); void push ();
void push_block (nano::block const &); void push_block (nano::block const &);
void send_finished (); void send_finished ();
std::shared_ptr<nano::bootstrap_client> connection; std::shared_ptr<nano::bootstrap_client> connection;

View file

@ -42,8 +42,7 @@ current (0),
count (0), count (0),
bulk_push_cost (0) bulk_push_cost (0)
{ {
auto transaction (connection->node->store.tx_begin_read ()); next ();
next (transaction);
} }
nano::frontier_req_client::~frontier_req_client () nano::frontier_req_client::~frontier_req_client ()
@ -123,14 +122,13 @@ void nano::frontier_req_client::received_frontier (boost::system::error_code con
{ {
connection->node->logger.always_log (boost::str (boost::format ("Received %1% frontiers from %2%") % std::to_string (count) % connection->channel->to_string ())); connection->node->logger.always_log (boost::str (boost::format ("Received %1% frontiers from %2%") % std::to_string (count) % connection->channel->to_string ()));
} }
auto transaction (connection->node->store.tx_begin_read ());
if (!account.is_zero ()) if (!account.is_zero ())
{ {
while (!current.is_zero () && current < account) while (!current.is_zero () && current < account)
{ {
// We know about an account they don't. // We know about an account they don't.
unsynced (frontier, 0); unsynced (frontier, 0);
next (transaction); next ();
} }
if (!current.is_zero ()) if (!current.is_zero ())
{ {
@ -142,7 +140,7 @@ void nano::frontier_req_client::received_frontier (boost::system::error_code con
} }
else else
{ {
if (connection->node->store.block_exists (transaction, latest)) if (connection->node->ledger.block_exists (latest))
{ {
// We know about a block they don't. // We know about a block they don't.
unsynced (frontier, latest); unsynced (frontier, latest);
@ -155,7 +153,7 @@ void nano::frontier_req_client::received_frontier (boost::system::error_code con
bulk_push_cost += 5; bulk_push_cost += 5;
} }
} }
next (transaction); next ();
} }
else else
{ {
@ -175,7 +173,7 @@ void nano::frontier_req_client::received_frontier (boost::system::error_code con
{ {
// We know about an account they don't. // We know about an account they don't.
unsynced (frontier, 0); unsynced (frontier, 0);
next (transaction); next ();
} }
if (connection->node->config.logging.bulk_pull_logging ()) if (connection->node->config.logging.bulk_pull_logging ())
{ {
@ -202,13 +200,14 @@ void nano::frontier_req_client::received_frontier (boost::system::error_code con
} }
} }
void nano::frontier_req_client::next (nano::transaction const & transaction_a) void nano::frontier_req_client::next ()
{ {
// Filling accounts deque to prevent often read transactions // Filling accounts deque to prevent often read transactions
if (accounts.empty ()) if (accounts.empty ())
{ {
size_t max_size (128); size_t max_size (128);
for (auto i (connection->node->store.latest_begin (transaction_a, current.number () + 1)), n (connection->node->store.latest_end ()); i != n && accounts.size () != max_size; ++i) auto transaction (connection->node->store.tx_begin_read ());
for (auto i (connection->node->store.latest_begin (transaction, current.number () + 1)), n (connection->node->store.latest_end ()); i != n && accounts.size () != max_size; ++i)
{ {
nano::account_info const & info (i->second); nano::account_info const & info (i->second);
nano::account const & account (i->first); nano::account const & account (i->first);

View file

@ -7,7 +7,6 @@
namespace nano namespace nano
{ {
class transaction;
class bootstrap_client; class bootstrap_client;
class frontier_req_client final : public std::enable_shared_from_this<nano::frontier_req_client> class frontier_req_client final : public std::enable_shared_from_this<nano::frontier_req_client>
{ {
@ -18,7 +17,7 @@ public:
void receive_frontier (); void receive_frontier ();
void received_frontier (boost::system::error_code const &, size_t); void received_frontier (boost::system::error_code const &, size_t);
void unsynced (nano::block_hash const &, nano::block_hash const &); void unsynced (nano::block_hash const &, nano::block_hash const &);
void next (nano::transaction const &); void next ();
std::shared_ptr<nano::bootstrap_client> connection; std::shared_ptr<nano::bootstrap_client> connection;
nano::account current; nano::account current;
nano::block_hash frontier; nano::block_hash frontier;