Demoting tcp_server shared_ptr<nano:node> to a weak_ptr

This commit is contained in:
Colin LeMahieu 2023-04-28 00:27:52 +01:00
commit 1ab5b6708f
No known key found for this signature in database
GPG key ID: 43708520C8DFB938
5 changed files with 300 additions and 105 deletions

View file

@ -369,36 +369,41 @@ void nano::bulk_pull_account_client::receive_pending ()
*/
void nano::bulk_pull_server::set_current_end ()
{
auto node = connection->node.lock ();
if (!node)
{
return;
}
include_start = false;
debug_assert (request != nullptr);
auto transaction (connection->node->store.tx_begin_read ());
if (!connection->node->store.block.exists (transaction, request->end))
auto transaction (node->store.tx_begin_read ());
if (!node->store.block.exists (transaction, request->end))
{
if (connection->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Bulk pull end block doesn't exist: %1%, sending everything") % request->end.to_string ()));
node->logger.try_log (boost::str (boost::format ("Bulk pull end block doesn't exist: %1%, sending everything") % request->end.to_string ()));
}
request->end.clear ();
}
if (connection->node->store.block.exists (transaction, request->start.as_block_hash ()))
if (node->store.block.exists (transaction, request->start.as_block_hash ()))
{
if (connection->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Bulk pull request for block hash: %1%") % request->start.to_string ()));
node->logger.try_log (boost::str (boost::format ("Bulk pull request for block hash: %1%") % request->start.to_string ()));
}
current = ascending () ? connection->node->store.block.successor (transaction, request->start.as_block_hash ()) : request->start.as_block_hash ();
current = ascending () ? node->store.block.successor (transaction, request->start.as_block_hash ()) : request->start.as_block_hash ();
include_start = true;
}
else
{
auto info = connection->node->ledger.account_info (transaction, request->start.as_account ());
auto info = node->ledger.account_info (transaction, request->start.as_account ());
if (!info)
{
if (connection->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Request for unknown account: %1%") % request->start.to_account ()));
node->logger.try_log (boost::str (boost::format ("Request for unknown account: %1%") % request->start.to_account ()));
}
current = request->end;
}
@ -407,12 +412,12 @@ void nano::bulk_pull_server::set_current_end ()
current = ascending () ? info->open_block : info->head;
if (!request->end.is_zero ())
{
auto account (connection->node->ledger.account (transaction, request->end));
auto account (node->ledger.account (transaction, request->end));
if (account != request->start.as_account ())
{
if (connection->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Request for block that is not on account chain: %1% not on %2%") % request->end.to_string () % request->start.to_account ()));
node->logger.try_log (boost::str (boost::format ("Request for block that is not on account chain: %1% not on %2%") % request->end.to_string () % request->start.to_account ()));
}
current = request->end;
}
@ -433,6 +438,11 @@ void nano::bulk_pull_server::set_current_end ()
void nano::bulk_pull_server::send_next ()
{
auto node = connection->node.lock ();
if (!node)
{
return;
}
auto block = get_next ();
if (block != nullptr)
{
@ -441,9 +451,9 @@ void nano::bulk_pull_server::send_next ()
nano::vectorstream stream (send_buffer);
nano::serialize_block (stream, *block);
}
if (connection->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Sending block: %1%") % block->hash ().to_string ()));
node->logger.try_log (boost::str (boost::format ("Sending block: %1%") % block->hash ().to_string ()));
}
connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) {
this_l->sent_action (ec, size_a);
@ -457,6 +467,11 @@ void nano::bulk_pull_server::send_next ()
std::shared_ptr<nano::block> nano::bulk_pull_server::get_next ()
{
auto node = connection->node.lock ();
if (!node)
{
return nullptr;
}
std::shared_ptr<nano::block> result;
bool send_current = false, set_current_to_end = false;
@ -496,7 +511,7 @@ std::shared_ptr<nano::block> nano::bulk_pull_server::get_next ()
if (send_current)
{
result = connection->node->block (current);
result = node->block (current);
if (result != nullptr && set_current_to_end == false)
{
auto next = ascending () ? result->sideband ().successor : result->previous ();
@ -528,28 +543,38 @@ std::shared_ptr<nano::block> nano::bulk_pull_server::get_next ()
void nano::bulk_pull_server::sent_action (boost::system::error_code const & ec, std::size_t size_a)
{
auto node = connection->node.lock ();
if (!node)
{
return;
}
if (!ec)
{
connection->node->bootstrap_workers.push_task ([this_l = shared_from_this ()] () {
node->bootstrap_workers.push_task ([this_l = shared_from_this ()] () {
this_l->send_next ();
});
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Unable to bulk send block: %1%") % ec.message ()));
node->logger.try_log (boost::str (boost::format ("Unable to bulk send block: %1%") % ec.message ()));
}
}
}
void nano::bulk_pull_server::send_finished ()
{
auto node = connection->node.lock ();
if (!node)
{
return;
}
nano::shared_const_buffer send_buffer (static_cast<uint8_t> (nano::block_type::not_a_block));
auto this_l (shared_from_this ());
if (connection->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log ("Bulk sending finished");
node->logger.try_log ("Bulk sending finished");
}
connection->socket->async_write (send_buffer, [this_l] (boost::system::error_code const & ec, std::size_t size_a) {
this_l->no_block_sent (ec, size_a);
@ -558,6 +583,11 @@ void nano::bulk_pull_server::send_finished ()
void nano::bulk_pull_server::no_block_sent (boost::system::error_code const & ec, std::size_t size_a)
{
auto node = connection->node.lock ();
if (!node)
{
return;
}
if (!ec)
{
debug_assert (size_a == 1);
@ -565,9 +595,9 @@ void nano::bulk_pull_server::no_block_sent (boost::system::error_code const & ec
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log ("Unable to send not-a-block");
node->logger.try_log ("Unable to send not-a-block");
}
}
}
@ -589,6 +619,11 @@ nano::bulk_pull_server::bulk_pull_server (std::shared_ptr<nano::transport::tcp_s
*/
void nano::bulk_pull_account_server::set_params ()
{
auto node = connection->node.lock ();
if (!node)
{
return;
}
debug_assert (request != nullptr);
/*
@ -615,9 +650,9 @@ void nano::bulk_pull_account_server::set_params ()
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Invalid bulk_pull_account flags supplied %1%") % static_cast<uint8_t> (request->flags)));
node->logger.try_log (boost::str (boost::format ("Invalid bulk_pull_account flags supplied %1%") % static_cast<uint8_t> (request->flags)));
}
invalid_request = true;
@ -639,13 +674,18 @@ void nano::bulk_pull_account_server::send_frontier ()
* so handle the invalid_request case by terminating the
* request without any response
*/
auto node = connection->node.lock ();
if (!node)
{
return;
}
if (!invalid_request)
{
auto stream_transaction (connection->node->store.tx_begin_read ());
auto stream_transaction (node->store.tx_begin_read ());
// Get account balance and frontier block hash
auto account_frontier_hash (connection->node->ledger.latest (stream_transaction, request->account));
auto account_frontier_balance_int (connection->node->ledger.account_balance (stream_transaction, request->account));
auto account_frontier_hash (node->ledger.latest (stream_transaction, request->account));
auto account_frontier_balance_int (node->ledger.account_balance (stream_transaction, request->account));
nano::uint128_union account_frontier_balance (account_frontier_balance_int);
// Write the frontier block hash and balance into a buffer
@ -670,6 +710,11 @@ void nano::bulk_pull_account_server::send_next_block ()
* Get the next item from the queue, it is a tuple with the key (which
* contains the account and hash) and data (which contains the amount)
*/
auto node = connection->node.lock ();
if (!node)
{
return;
}
auto block_data (get_next ());
auto block_info_key (block_data.first.get ());
auto block_info (block_data.second.get ());
@ -685,9 +730,9 @@ void nano::bulk_pull_account_server::send_next_block ()
{
nano::vectorstream output_stream (send_buffer);
if (connection->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Sending address: %1%") % block_info->source.to_string ()));
node->logger.try_log (boost::str (boost::format ("Sending address: %1%") % block_info->source.to_string ()));
}
write (output_stream, block_info->source.bytes);
@ -696,9 +741,9 @@ void nano::bulk_pull_account_server::send_next_block ()
{
nano::vectorstream output_stream (send_buffer);
if (connection->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Sending block: %1%") % block_info_key->hash.to_string ()));
node->logger.try_log (boost::str (boost::format ("Sending block: %1%") % block_info_key->hash.to_string ()));
}
write (output_stream, block_info_key->hash.bytes);
@ -723,9 +768,9 @@ void nano::bulk_pull_account_server::send_next_block ()
/*
* Otherwise, finalize the connection
*/
if (connection->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Done sending blocks")));
node->logger.try_log (boost::str (boost::format ("Done sending blocks")));
}
send_finished ();
@ -734,6 +779,11 @@ void nano::bulk_pull_account_server::send_next_block ()
std::pair<std::unique_ptr<nano::pending_key>, std::unique_ptr<nano::pending_info>> nano::bulk_pull_account_server::get_next ()
{
auto node = connection->node.lock ();
if (!node)
{
return { nullptr, nullptr };
}
std::pair<std::unique_ptr<nano::pending_key>, std::unique_ptr<nano::pending_info>> result;
while (true)
@ -743,8 +793,8 @@ std::pair<std::unique_ptr<nano::pending_key>, std::unique_ptr<nano::pending_info
* destroy a database transaction, to avoid locking the
* database for a prolonged period.
*/
auto stream_transaction (connection->node->store.tx_begin_read ());
auto stream (connection->node->store.pending.begin (stream_transaction, current_key));
auto stream_transaction (node->store.tx_begin_read ());
auto stream (node->store.pending.begin (stream_transaction, current_key));
if (stream == nano::store_iterator<nano::pending_key, nano::pending_info> (nullptr))
{
@ -813,17 +863,22 @@ std::pair<std::unique_ptr<nano::pending_key>, std::unique_ptr<nano::pending_info
void nano::bulk_pull_account_server::sent_action (boost::system::error_code const & ec, std::size_t size_a)
{
auto node = connection->node.lock ();
if (!node)
{
return;
}
if (!ec)
{
connection->node->bootstrap_workers.push_task ([this_l = shared_from_this ()] () {
node->bootstrap_workers.push_task ([this_l = shared_from_this ()] () {
this_l->send_next_block ();
});
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Unable to bulk send block: %1%") % ec.message ()));
node->logger.try_log (boost::str (boost::format ("Unable to bulk send block: %1%") % ec.message ()));
}
}
}
@ -838,6 +893,11 @@ void nano::bulk_pull_account_server::send_finished ()
* "pending_include_address" flag is not set) or 640-bits of zeros
* (if that flag is set).
*/
auto node = connection->node.lock ();
if (!node)
{
return;
}
std::vector<uint8_t> send_buffer;
{
nano::vectorstream output_stream (send_buffer);
@ -858,9 +918,9 @@ void nano::bulk_pull_account_server::send_finished ()
auto this_l (shared_from_this ());
if (connection->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log ("Bulk sending for an account finished");
node->logger.try_log ("Bulk sending for an account finished");
}
connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l] (boost::system::error_code const & ec, std::size_t size_a) {
@ -870,6 +930,11 @@ void nano::bulk_pull_account_server::send_finished ()
void nano::bulk_pull_account_server::complete (boost::system::error_code const & ec, std::size_t size_a)
{
auto node = connection->node.lock ();
if (!node)
{
return;
}
if (!ec)
{
if (pending_address_only)
@ -892,9 +957,9 @@ void nano::bulk_pull_account_server::complete (boost::system::error_code const &
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log ("Unable to pending-as-zero");
node->logger.try_log ("Unable to pending-as-zero");
}
}
}

View file

@ -141,14 +141,19 @@ nano::bulk_push_server::bulk_push_server (std::shared_ptr<nano::transport::tcp_s
void nano::bulk_push_server::throttled_receive ()
{
if (!connection->node->block_processor.half_full ())
auto node = connection->node.lock ();
if (!node)
{
return;
}
if (!node->block_processor.half_full ())
{
receive ();
}
else
{
auto this_l (shared_from_this ());
connection->node->workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_l] () {
node->workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_l] () {
if (!this_l->connection->stopped)
{
this_l->throttled_receive ();
@ -159,26 +164,36 @@ void nano::bulk_push_server::throttled_receive ()
void nano::bulk_push_server::receive ()
{
if (connection->node->bootstrap_initiator.in_progress ())
auto node = connection->node.lock ();
if (!node)
{
if (connection->node->config.logging.bulk_pull_logging ())
return;
}
if (node->bootstrap_initiator.in_progress ())
{
if (node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log ("Aborting bulk_push because a bootstrap attempt is in progress");
node->logger.try_log ("Aborting bulk_push because a bootstrap attempt is in progress");
}
}
else
{
auto this_l (shared_from_this ());
connection->socket->async_read (receive_buffer, 1, [this_l] (boost::system::error_code const & ec, std::size_t size_a) {
auto node = this_l->connection->node.lock ();
if (!node)
{
return;
}
if (!ec)
{
this_l->received_type ();
}
else
{
if (this_l->connection->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error receiving block type: %1%") % ec.message ()));
node->logger.try_log (boost::str (boost::format ("Error receiving block type: %1%") % ec.message ()));
}
}
});
@ -187,13 +202,18 @@ void nano::bulk_push_server::receive ()
void nano::bulk_push_server::received_type ()
{
auto node = connection->node.lock ();
if (!node)
{
return;
}
auto this_l (shared_from_this ());
nano::block_type type (static_cast<nano::block_type> (receive_buffer->data ()[0]));
switch (type)
{
case nano::block_type::send:
{
connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::send, nano::stat::dir::in);
node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::send, nano::stat::dir::in);
connection->socket->async_read (receive_buffer, nano::send_block::size, [this_l, type] (boost::system::error_code const & ec, std::size_t size_a) {
this_l->received_block (ec, size_a, type);
});
@ -201,7 +221,7 @@ void nano::bulk_push_server::received_type ()
}
case nano::block_type::receive:
{
connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::receive, nano::stat::dir::in);
node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::receive, nano::stat::dir::in);
connection->socket->async_read (receive_buffer, nano::receive_block::size, [this_l, type] (boost::system::error_code const & ec, std::size_t size_a) {
this_l->received_block (ec, size_a, type);
});
@ -209,7 +229,7 @@ void nano::bulk_push_server::received_type ()
}
case nano::block_type::open:
{
connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::open, nano::stat::dir::in);
node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::open, nano::stat::dir::in);
connection->socket->async_read (receive_buffer, nano::open_block::size, [this_l, type] (boost::system::error_code const & ec, std::size_t size_a) {
this_l->received_block (ec, size_a, type);
});
@ -217,7 +237,7 @@ void nano::bulk_push_server::received_type ()
}
case nano::block_type::change:
{
connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::change, nano::stat::dir::in);
node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::change, nano::stat::dir::in);
connection->socket->async_read (receive_buffer, nano::change_block::size, [this_l, type] (boost::system::error_code const & ec, std::size_t size_a) {
this_l->received_block (ec, size_a, type);
});
@ -225,7 +245,7 @@ void nano::bulk_push_server::received_type ()
}
case nano::block_type::state:
{
connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::state_block, nano::stat::dir::in);
node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::state_block, nano::stat::dir::in);
connection->socket->async_read (receive_buffer, nano::state_block::size, [this_l, type] (boost::system::error_code const & ec, std::size_t size_a) {
this_l->received_block (ec, size_a, type);
});
@ -238,9 +258,9 @@ void nano::bulk_push_server::received_type ()
}
default:
{
if (connection->node->config.logging.network_packet_logging ())
if (node->config.logging.network_packet_logging ())
{
connection->node->logger.try_log ("Unknown type received as block type");
node->logger.try_log ("Unknown type received as block type");
}
break;
}
@ -249,29 +269,34 @@ void nano::bulk_push_server::received_type ()
void nano::bulk_push_server::received_block (boost::system::error_code const & ec, std::size_t size_a, nano::block_type type_a)
{
auto node = connection->node.lock ();
if (!node)
{
return;
}
if (!ec)
{
nano::bufferstream stream (receive_buffer->data (), size_a);
auto block (nano::deserialize_block (stream, type_a));
if (block != nullptr)
{
if (connection->node->network_params.work.validate_entry (*block))
if (node->network_params.work.validate_entry (*block))
{
if (connection->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Insufficient work for bulk push block: %1%") % block->hash ().to_string ()));
node->logger.try_log (boost::str (boost::format ("Insufficient work for bulk push block: %1%") % block->hash ().to_string ()));
}
connection->node->stats.inc_detail_only (nano::stat::type::error, nano::stat::detail::insufficient_work);
node->stats.inc_detail_only (nano::stat::type::error, nano::stat::detail::insufficient_work);
return;
}
connection->node->process_active (std::move (block));
node->process_active (std::move (block));
throttled_receive ();
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log ("Error deserializing block received from pull request");
node->logger.try_log ("Error deserializing block received from pull request");
}
}
}

View file

@ -276,6 +276,11 @@ nano::frontier_req_server::frontier_req_server (std::shared_ptr<nano::transport:
void nano::frontier_req_server::send_next ()
{
auto node = connection->node.lock ();
if (!node)
{
return;
}
if (!current.is_zero () && count < request->count)
{
std::vector<uint8_t> send_buffer;
@ -287,9 +292,9 @@ void nano::frontier_req_server::send_next ()
debug_assert (!frontier.is_zero ());
}
auto this_l (shared_from_this ());
if (connection->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Sending frontier for %1% %2%") % current.to_account () % frontier.to_string ()));
node->logger.try_log (boost::str (boost::format ("Sending frontier for %1% %2%") % current.to_account () % frontier.to_string ()));
}
next ();
connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l] (boost::system::error_code const & ec, std::size_t size_a) {
@ -304,6 +309,11 @@ void nano::frontier_req_server::send_next ()
void nano::frontier_req_server::send_finished ()
{
auto node = connection->node.lock ();
if (!node)
{
return;
}
std::vector<uint8_t> send_buffer;
{
nano::vectorstream stream (send_buffer);
@ -312,9 +322,9 @@ void nano::frontier_req_server::send_finished ()
write (stream, zero.bytes);
}
auto this_l (shared_from_this ());
if (connection->node->config.logging.network_logging ())
if (node->config.logging.network_logging ())
{
connection->node->logger.try_log ("Frontier sending finished");
node->logger.try_log ("Frontier sending finished");
}
connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l] (boost::system::error_code const & ec, std::size_t size_a) {
this_l->no_block_sent (ec, size_a);
@ -323,50 +333,65 @@ void nano::frontier_req_server::send_finished ()
void nano::frontier_req_server::no_block_sent (boost::system::error_code const & ec, std::size_t size_a)
{
auto node = connection->node.lock ();
if (!node)
{
return;
}
if (!ec)
{
connection->start ();
}
else
{
if (connection->node->config.logging.network_logging ())
if (node->config.logging.network_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Error sending frontier finish: %1%") % ec.message ()));
node->logger.try_log (boost::str (boost::format ("Error sending frontier finish: %1%") % ec.message ()));
}
}
}
void nano::frontier_req_server::sent_action (boost::system::error_code const & ec, std::size_t size_a)
{
auto node = connection->node.lock ();
if (!node)
{
return;
}
if (!ec)
{
count++;
connection->node->bootstrap_workers.push_task ([this_l = shared_from_this ()] () {
node->bootstrap_workers.push_task ([this_l = shared_from_this ()] () {
this_l->send_next ();
});
}
else
{
if (connection->node->config.logging.network_logging ())
if (node->config.logging.network_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Error sending frontier pair: %1%") % ec.message ()));
node->logger.try_log (boost::str (boost::format ("Error sending frontier pair: %1%") % ec.message ()));
}
}
}
void nano::frontier_req_server::next ()
{
auto node = connection->node.lock ();
if (!node)
{
return;
}
// Filling accounts deque to prevent often read transactions
if (accounts.empty ())
{
auto now (nano::seconds_since_epoch ());
bool disable_age_filter (request->age == std::numeric_limits<decltype (request->age)>::max ());
std::size_t max_size (128);
auto transaction (connection->node->store.tx_begin_read ());
auto transaction (node->store.tx_begin_read ());
if (!send_confirmed ())
{
for (auto i (connection->node->store.account.begin (transaction, current.number () + 1)), n (connection->node->store.account.end ()); i != n && accounts.size () != max_size; ++i)
for (auto i (node->store.account.begin (transaction, current.number () + 1)), n (node->store.account.end ()); i != n && accounts.size () != max_size; ++i)
{
nano::account_info const & info (i->second);
if (disable_age_filter || (now - info.modified) <= request->age)
@ -378,7 +403,7 @@ void nano::frontier_req_server::next ()
}
else
{
for (auto i (connection->node->store.confirmation_height.begin (transaction, current.number () + 1)), n (connection->node->store.confirmation_height.end ()); i != n && accounts.size () != max_size; ++i)
for (auto i (node->store.confirmation_height.begin (transaction, current.number () + 1)), n (node->store.confirmation_height.end ()); i != n && accounts.size () != max_size; ++i)
{
nano::confirmation_height_info const & info (i->second);
nano::block_hash const & confirmed_frontier (info.frontier);

View file

@ -136,7 +136,7 @@ nano::transport::tcp_server::tcp_server (std::shared_ptr<nano::transport::socket
node{ std::move (node_a) },
allow_bootstrap{ allow_bootstrap_a },
message_deserializer{
std::make_shared<nano::transport::message_deserializer> (node->network_params.network, node->network.publish_filter, node->block_uniquer, node->vote_uniquer,
std::make_shared<nano::transport::message_deserializer> (node_a->network_params.network, node_a->network.publish_filter, node_a->block_uniquer, node_a->vote_uniquer,
[socket_l = socket] (std::shared_ptr<std::vector<uint8_t>> const & data_a, size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a) {
debug_assert (socket_l != nullptr);
socket_l->read_impl (data_a, size_a, callback_a);
@ -148,6 +148,11 @@ nano::transport::tcp_server::tcp_server (std::shared_ptr<nano::transport::socket
nano::transport::tcp_server::~tcp_server ()
{
auto node = this->node.lock ();
if (!node)
{
return;
}
if (node->config.logging.bulk_pull_logging ())
{
node->logger.try_log ("Exiting incoming TCP/bootstrap server");
@ -203,10 +208,15 @@ void nano::transport::tcp_server::receive_message ()
}
message_deserializer->read ([this_l = shared_from_this ()] (boost::system::error_code ec, std::unique_ptr<nano::message> message) {
auto node = this_l->node.lock ();
if (!node)
{
return;
}
if (ec)
{
// IO error or critical error when deserializing message
this_l->node->stats.inc (nano::stat::type::error, nano::transport::message_deserializer::to_stat_detail (this_l->message_deserializer->status));
node->stats.inc (nano::stat::type::error, nano::transport::message_deserializer::to_stat_detail (this_l->message_deserializer->status));
this_l->stop ();
}
else
@ -218,6 +228,11 @@ void nano::transport::tcp_server::receive_message ()
void nano::transport::tcp_server::received_message (std::unique_ptr<nano::message> message)
{
auto node = this->node.lock ();
if (!node)
{
return;
}
bool should_continue = true;
if (message)
{
@ -242,6 +257,11 @@ void nano::transport::tcp_server::received_message (std::unique_ptr<nano::messag
bool nano::transport::tcp_server::process_message (std::unique_ptr<nano::message> message)
{
auto node = this->node.lock ();
if (!node)
{
return false;
}
node->stats.inc (nano::stat::type::tcp_server, nano::to_stat_detail (message->header.type), nano::stat::dir::in);
debug_assert (is_undefined_connection () || is_realtime_connection () || is_bootstrap_connection ());
@ -303,6 +323,11 @@ bool nano::transport::tcp_server::process_message (std::unique_ptr<nano::message
void nano::transport::tcp_server::queue_realtime (std::unique_ptr<nano::message> message)
{
auto node = this->node.lock ();
if (!node)
{
return;
}
node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::move (message), remote_endpoint, remote_node_id, socket });
}
@ -317,11 +342,16 @@ nano::transport::tcp_server::handshake_message_visitor::handshake_message_visito
void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (nano::node_id_handshake const & message)
{
if (server->node->flags.disable_tcp_realtime)
auto node = server->node.lock ();
if (!node)
{
if (server->node->config.logging.network_node_id_handshake_logging ())
return;
}
if (node->flags.disable_tcp_realtime)
{
if (node->config.logging.network_node_id_handshake_logging ())
{
server->node->logger.try_log (boost::str (boost::format ("Disabled realtime TCP for handshake %1%") % server->remote_endpoint));
node->logger.try_log (boost::str (boost::format ("Disabled realtime TCP for handshake %1%") % server->remote_endpoint));
}
// Stop invalid handshake
server->stop ();
@ -330,9 +360,9 @@ void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (
if (message.query && server->handshake_query_received)
{
if (server->node->config.logging.network_node_id_handshake_logging ())
if (node->config.logging.network_node_id_handshake_logging ())
{
server->node->logger.try_log (boost::str (boost::format ("Detected multiple node_id_handshake query from %1%") % server->remote_endpoint));
node->logger.try_log (boost::str (boost::format ("Detected multiple node_id_handshake query from %1%") % server->remote_endpoint));
}
// Stop invalid handshake
server->stop ();
@ -341,9 +371,9 @@ void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (
server->handshake_query_received = true;
if (server->node->config.logging.network_node_id_handshake_logging ())
if (node->config.logging.network_node_id_handshake_logging ())
{
server->node->logger.try_log (boost::str (boost::format ("Received node_id_handshake message from %1%") % server->remote_endpoint));
node->logger.try_log (boost::str (boost::format ("Received node_id_handshake message from %1%") % server->remote_endpoint));
}
if (message.query)
@ -352,7 +382,7 @@ void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (
}
if (message.response)
{
if (server->node->network.verify_handshake_response (*message.response, nano::transport::map_tcp_to_endpoint (server->remote_endpoint)))
if (node->network.verify_handshake_response (*message.response, nano::transport::map_tcp_to_endpoint (server->remote_endpoint)))
{
server->to_realtime_connection (message.response->node_id);
}
@ -369,6 +399,11 @@ void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (
void nano::transport::tcp_server::send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2)
{
auto node = this->node.lock ();
if (!node)
{
return;
}
auto response = node->network.prepare_handshake_response (query, v2);
auto own_query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint));
nano::node_id_handshake handshake_response{ node->network_params.network, own_query, response };
@ -376,18 +411,23 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha
// TODO: Use channel
auto shared_const_buffer = handshake_response.to_shared_const_buffer ();
socket->async_write (shared_const_buffer, [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) {
auto node = this_l->node.lock ();
if (!node)
{
return;
}
if (ec)
{
if (this_l->node->config.logging.network_node_id_handshake_logging ())
if (node->config.logging.network_node_id_handshake_logging ())
{
this_l->node->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % this_l->remote_endpoint % ec.message ()));
node->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % this_l->remote_endpoint % ec.message ()));
}
// Stop invalid handshake
this_l->stop ();
}
else
{
this_l->node->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::out);
node->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::out);
}
});
}
@ -448,15 +488,20 @@ void nano::transport::tcp_server::realtime_message_visitor::frontier_req (const
void nano::transport::tcp_server::realtime_message_visitor::telemetry_req (const nano::telemetry_req & message)
{
auto node = server.node.lock ();
if (!node)
{
return;
}
// Only handle telemetry requests if they are outside the cooldown period
if (server.last_telemetry_req + server.node->network_params.network.telemetry_request_cooldown < std::chrono::steady_clock::now ())
if (server.last_telemetry_req + node->network_params.network.telemetry_request_cooldown < std::chrono::steady_clock::now ())
{
server.last_telemetry_req = std::chrono::steady_clock::now ();
process = true;
}
else
{
server.node->stats.inc (nano::stat::type::telemetry, nano::stat::detail::request_within_protection_cache_zone);
node->stats.inc (nano::stat::type::telemetry, nano::stat::detail::request_within_protection_cache_zone);
}
}
@ -486,17 +531,22 @@ nano::transport::tcp_server::bootstrap_message_visitor::bootstrap_message_visito
void nano::transport::tcp_server::bootstrap_message_visitor::bulk_pull (const nano::bulk_pull & message)
{
if (server->node->flags.disable_bootstrap_bulk_pull_server)
auto node = server->node.lock ();
if (!node)
{
return;
}
if (node->flags.disable_bootstrap_bulk_pull_server)
{
return;
}
if (server->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
server->node->logger.try_log (boost::str (boost::format ("Received bulk pull for %1% down to %2%, maximum of %3% from %4%") % message.start.to_string () % message.end.to_string () % message.count % server->remote_endpoint));
node->logger.try_log (boost::str (boost::format ("Received bulk pull for %1% down to %2%, maximum of %3% from %4%") % message.start.to_string () % message.end.to_string () % message.count % server->remote_endpoint));
}
server->node->bootstrap_workers.push_task ([server = server, message = message] () {
node->bootstrap_workers.push_task ([server = server, message = message] () {
// TODO: Add completion callback to bulk pull server
// TODO: There should be no need to re-copy message as unique pointer, refactor those bulk/frontier pull/push servers
auto bulk_pull_server = std::make_shared<nano::bulk_pull_server> (server, std::make_unique<nano::bulk_pull> (message));
@ -508,17 +558,22 @@ void nano::transport::tcp_server::bootstrap_message_visitor::bulk_pull (const na
void nano::transport::tcp_server::bootstrap_message_visitor::bulk_pull_account (const nano::bulk_pull_account & message)
{
if (server->node->flags.disable_bootstrap_bulk_pull_server)
auto node = server->node.lock ();
if (!node)
{
return;
}
if (node->flags.disable_bootstrap_bulk_pull_server)
{
return;
}
if (server->node->config.logging.bulk_pull_logging ())
if (node->config.logging.bulk_pull_logging ())
{
server->node->logger.try_log (boost::str (boost::format ("Received bulk pull account for %1% with a minimum amount of %2%") % message.account.to_account () % nano::amount (message.minimum_amount).format_balance (nano::Mxrb_ratio, 10, true)));
node->logger.try_log (boost::str (boost::format ("Received bulk pull account for %1% with a minimum amount of %2%") % message.account.to_account () % nano::amount (message.minimum_amount).format_balance (nano::Mxrb_ratio, 10, true)));
}
server->node->bootstrap_workers.push_task ([server = server, message = message] () {
node->bootstrap_workers.push_task ([server = server, message = message] () {
// TODO: Add completion callback to bulk pull server
// TODO: There should be no need to re-copy message as unique pointer, refactor those bulk/frontier pull/push servers
auto bulk_pull_account_server = std::make_shared<nano::bulk_pull_account_server> (server, std::make_unique<nano::bulk_pull_account> (message));
@ -530,7 +585,12 @@ void nano::transport::tcp_server::bootstrap_message_visitor::bulk_pull_account (
void nano::transport::tcp_server::bootstrap_message_visitor::bulk_push (const nano::bulk_push &)
{
server->node->bootstrap_workers.push_task ([server = server] () {
auto node = server->node.lock ();
if (!node)
{
return;
}
node->bootstrap_workers.push_task ([server = server] () {
// TODO: Add completion callback to bulk pull server
auto bulk_push_server = std::make_shared<nano::bulk_push_server> (server);
bulk_push_server->throttled_receive ();
@ -541,12 +601,17 @@ void nano::transport::tcp_server::bootstrap_message_visitor::bulk_push (const na
void nano::transport::tcp_server::bootstrap_message_visitor::frontier_req (const nano::frontier_req & message)
{
if (server->node->config.logging.bulk_pull_logging ())
auto node = server->node.lock ();
if (!node)
{
server->node->logger.try_log (boost::str (boost::format ("Received frontier request for %1% with age %2%") % message.start.to_string () % message.age));
return;
}
if (node->config.logging.bulk_pull_logging ())
{
node->logger.try_log (boost::str (boost::format ("Received frontier request for %1% with age %2%") % message.start.to_string () % message.age));
}
server->node->bootstrap_workers.push_task ([server = server, message = message] () {
node->bootstrap_workers.push_task ([server = server, message = message] () {
// TODO: There should be no need to re-copy message as unique pointer, refactor those bulk/frontier pull/push servers
auto response = std::make_shared<nano::frontier_req_server> (server, std::make_unique<nano::frontier_req> (message));
response->send_next ();
@ -559,6 +624,11 @@ void nano::transport::tcp_server::bootstrap_message_visitor::frontier_req (const
// and since we only ever store tcp_server as weak_ptr, socket timeout will automatically trigger tcp_server cleanup
void nano::transport::tcp_server::timeout ()
{
auto node = this->node.lock ();
if (!node)
{
return;
}
if (socket->has_timed_out ())
{
if (node->config.logging.bulk_pull_logging ())
@ -575,6 +645,11 @@ void nano::transport::tcp_server::timeout ()
bool nano::transport::tcp_server::to_bootstrap_connection ()
{
auto node = this->node.lock ();
if (!node)
{
return false;
}
if (!allow_bootstrap)
{
return false;
@ -599,6 +674,11 @@ bool nano::transport::tcp_server::to_bootstrap_connection ()
bool nano::transport::tcp_server::to_realtime_connection (nano::account const & node_id)
{
auto node = this->node.lock ();
if (!node)
{
return false;
}
if (socket->type () == nano::transport::socket::type_t::undefined && !node->flags.disable_tcp_realtime)
{
remote_node_id = node_id;

View file

@ -53,7 +53,7 @@ public:
void timeout ();
std::shared_ptr<nano::transport::socket> const socket;
std::shared_ptr<nano::node> const node;
std::weak_ptr<nano::node> const node;
nano::mutex mutex;
std::atomic<bool> stopped{ false };
std::atomic<bool> handshake_query_received{ false };