Better move semantics & fix clang-tidy warnings (#3606)
This commit is contained in:
parent
afd5f033ab
commit
e573cd7252
11 changed files with 115 additions and 128 deletions
|
@ -199,9 +199,8 @@ public:
|
|||
fake_work_peer () = delete;
|
||||
fake_work_peer (nano::work_pool & pool_a, asio::io_context & ioc_a, unsigned short port_a, work_peer_type const type_a, nano::work_version const version_a = nano::work_version::work_1) :
|
||||
pool (pool_a),
|
||||
endpoint (tcp::v4 (), port_a),
|
||||
ioc (ioc_a),
|
||||
acceptor (ioc_a, endpoint),
|
||||
acceptor (ioc_a, tcp::endpoint{ tcp::v4 (), port_a }),
|
||||
type (type_a),
|
||||
version (version_a)
|
||||
{
|
||||
|
@ -254,8 +253,8 @@ private:
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
nano::work_pool & pool;
|
||||
tcp::endpoint endpoint;
|
||||
asio::io_context & ioc;
|
||||
tcp::acceptor acceptor;
|
||||
work_peer_type const type;
|
||||
|
|
|
@ -46,7 +46,7 @@ class socket_client : public nano::ipc::socket_base, public channel, public std:
|
|||
{
|
||||
public:
|
||||
socket_client (boost::asio::io_context & io_ctx_a, ENDPOINT_TYPE endpoint_a) :
|
||||
socket_base (io_ctx_a), endpoint (endpoint_a), socket (io_ctx_a), resolver (io_ctx_a), strand (io_ctx_a.get_executor ())
|
||||
socket_base (io_ctx_a), endpoint (std::move (endpoint_a)), socket (io_ctx_a), resolver (io_ctx_a), strand (io_ctx_a.get_executor ())
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -54,17 +54,17 @@ public:
|
|||
{
|
||||
auto this_l (this->shared_from_this ());
|
||||
this_l->timer_start (io_timeout);
|
||||
resolver.async_resolve (boost::asio::ip::tcp::resolver::query (host_a, std::to_string (port_a)), [this_l, callback_a] (boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator endpoint_iterator_a) {
|
||||
resolver.async_resolve (boost::asio::ip::tcp::resolver::query (host_a, std::to_string (port_a)), [this_l, callback = std::move (callback_a)] (boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator endpoint_iterator_a) {
|
||||
this_l->timer_cancel ();
|
||||
boost::asio::ip::tcp::resolver::iterator end;
|
||||
if (!ec && endpoint_iterator_a != end)
|
||||
{
|
||||
this_l->endpoint = *endpoint_iterator_a;
|
||||
callback_a (ec, *endpoint_iterator_a);
|
||||
callback (ec, *endpoint_iterator_a);
|
||||
}
|
||||
else
|
||||
{
|
||||
callback_a (ec, *end);
|
||||
callback (ec, *end);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -73,9 +73,9 @@ public:
|
|||
{
|
||||
auto this_l (this->shared_from_this ());
|
||||
this_l->timer_start (io_timeout);
|
||||
socket.async_connect (endpoint, boost::asio::bind_executor (strand, [this_l, callback_a] (boost::system::error_code const & ec) {
|
||||
socket.async_connect (endpoint, boost::asio::bind_executor (strand, [this_l, callback = std::move (callback_a)] (boost::system::error_code const & ec) {
|
||||
this_l->timer_cancel ();
|
||||
callback_a (ec);
|
||||
callback (ec);
|
||||
}));
|
||||
}
|
||||
|
||||
|
@ -107,6 +107,8 @@ public:
|
|||
}));
|
||||
}
|
||||
|
||||
// TODO: investigate clang-tidy warning about recursive call chain
|
||||
//
|
||||
void write_queued_messages ()
|
||||
{
|
||||
auto this_l (this->shared_from_this ());
|
||||
|
@ -171,8 +173,8 @@ private:
|
|||
class queue_item
|
||||
{
|
||||
public:
|
||||
queue_item (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, size_t)> callback_a) :
|
||||
buffer (buffer_a), callback (callback_a)
|
||||
queue_item (nano::shared_const_buffer buffer_a, std::function<void (boost::system::error_code const &, size_t)> callback_a) :
|
||||
buffer (std::move (buffer_a)), callback (std::move (callback_a))
|
||||
{
|
||||
}
|
||||
nano::shared_const_buffer buffer;
|
||||
|
@ -205,16 +207,16 @@ public:
|
|||
{
|
||||
tcp_client = std::make_shared<socket_client<socket_type, boost::asio::ip::tcp::endpoint>> (io_ctx, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v6 (), port_a));
|
||||
|
||||
tcp_client->async_resolve (host_a, port_a, [this, callback_a] (boost::system::error_code const & ec_resolve_a, boost::asio::ip::tcp::endpoint endpoint_a) {
|
||||
tcp_client->async_resolve (host_a, port_a, [this, callback = std::move (callback_a)] (boost::system::error_code const & ec_resolve_a, boost::asio::ip::tcp::endpoint const &) mutable {
|
||||
if (!ec_resolve_a)
|
||||
{
|
||||
this->tcp_client->async_connect ([callback_a] (boost::system::error_code const & ec_connect_a) {
|
||||
callback_a (nano::error (ec_connect_a));
|
||||
this->tcp_client->async_connect ([cbk = std::move (callback)] (boost::system::error_code const & ec_connect_a) {
|
||||
cbk (nano::error (ec_connect_a));
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
callback_a (nano::error (ec_resolve_a));
|
||||
callback (nano::error (ec_resolve_a));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -263,14 +265,14 @@ void nano::ipc::ipc_client::async_connect (std::string const & host_a, uint16_t
|
|||
{
|
||||
impl = std::make_unique<client_impl> (io_ctx);
|
||||
auto client (boost::polymorphic_downcast<client_impl *> (impl.get ()));
|
||||
client->connect (host_a, port_a, callback_a);
|
||||
client->connect (host_a, port_a, std::move (callback_a));
|
||||
}
|
||||
|
||||
nano::error nano::ipc::ipc_client::connect (std::string const & host, uint16_t port)
|
||||
{
|
||||
std::promise<nano::error> result_l;
|
||||
async_connect (host, port, [&result_l] (nano::error err_a) {
|
||||
result_l.set_value (err_a);
|
||||
result_l.set_value (std::move (err_a));
|
||||
});
|
||||
return result_l.get_future ().get ();
|
||||
}
|
||||
|
@ -278,16 +280,16 @@ nano::error nano::ipc::ipc_client::connect (std::string const & host, uint16_t p
|
|||
void nano::ipc::ipc_client::async_write (nano::shared_const_buffer const & buffer_a, std::function<void (nano::error, size_t)> callback_a)
|
||||
{
|
||||
auto client (boost::polymorphic_downcast<client_impl *> (impl.get ()));
|
||||
client->get_channel ().async_write (buffer_a, [callback_a] (boost::system::error_code const & ec_a, size_t bytes_written_a) {
|
||||
callback_a (nano::error (ec_a), bytes_written_a);
|
||||
client->get_channel ().async_write (buffer_a, [callback = std::move (callback_a)] (boost::system::error_code const & ec_a, size_t bytes_written_a) {
|
||||
callback (nano::error (ec_a), bytes_written_a);
|
||||
});
|
||||
}
|
||||
|
||||
void nano::ipc::ipc_client::async_read (std::shared_ptr<std::vector<uint8_t>> const & buffer_a, size_t size_a, std::function<void (nano::error, size_t)> callback_a)
|
||||
{
|
||||
auto client (boost::polymorphic_downcast<client_impl *> (impl.get ()));
|
||||
client->get_channel ().async_read (buffer_a, size_a, [callback_a, buffer_a] (boost::system::error_code const & ec_a, size_t bytes_read_a) {
|
||||
callback_a (nano::error (ec_a), bytes_read_a);
|
||||
client->get_channel ().async_read (buffer_a, size_a, [callback = std::move (callback_a), buffer_a] (boost::system::error_code const & ec_a, size_t bytes_read_a) {
|
||||
callback (nano::error (ec_a), bytes_read_a);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -295,8 +297,8 @@ void nano::ipc::ipc_client::async_read (std::shared_ptr<std::vector<uint8_t>> co
|
|||
void nano::ipc::ipc_client::async_read_message (std::shared_ptr<std::vector<uint8_t>> const & buffer_a, std::chrono::seconds timeout_a, std::function<void (nano::error, size_t)> callback_a)
|
||||
{
|
||||
auto client (boost::polymorphic_downcast<client_impl *> (impl.get ()));
|
||||
client->get_channel ().async_read_message (buffer_a, timeout_a, [callback_a, buffer_a] (boost::system::error_code const & ec_a, size_t bytes_read_a) {
|
||||
callback_a (nano::error (ec_a), bytes_read_a);
|
||||
client->get_channel ().async_read_message (buffer_a, timeout_a, [callback = std::move (callback_a), buffer_a] (boost::system::error_code const & ec_a, size_t bytes_read_a) {
|
||||
callback (nano::error (ec_a), bytes_read_a);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -342,12 +344,12 @@ std::string nano::ipc::request (nano::ipc::payload_encoding encoding_a, nano::ip
|
|||
auto res (std::make_shared<std::vector<uint8_t>> ());
|
||||
|
||||
std::promise<std::string> result_l;
|
||||
ipc_client.async_write (req, [&ipc_client, &res, &result_l] (nano::error err_a, size_t size_a) {
|
||||
ipc_client.async_write (req, [&ipc_client, &res, &result_l] (nano::error const &, size_t size_a) {
|
||||
// Read length
|
||||
ipc_client.async_read (res, sizeof (uint32_t), [&ipc_client, &res, &result_l] (nano::error err_read_a, size_t size_read_a) {
|
||||
ipc_client.async_read (res, sizeof (uint32_t), [&ipc_client, &res, &result_l] (nano::error const &, size_t size_read_a) {
|
||||
uint32_t payload_size_l = boost::endian::big_to_native (*reinterpret_cast<uint32_t *> (res->data ()));
|
||||
// Read json payload
|
||||
ipc_client.async_read (res, payload_size_l, [&res, &result_l] (nano::error err_read_a, size_t size_read_a) {
|
||||
ipc_client.async_read (res, payload_size_l, [&res, &result_l] (nano::error const &, size_t size_read_a) {
|
||||
result_l.set_value (std::string (res->begin (), res->end ()));
|
||||
});
|
||||
});
|
||||
|
|
|
@ -179,7 +179,7 @@ void nano::bootstrap_connections::connect_client (nano::tcp_endpoint const & end
|
|||
});
|
||||
}
|
||||
|
||||
unsigned nano::bootstrap_connections::target_connections (std::size_t pulls_remaining, std::size_t attempts_count)
|
||||
unsigned nano::bootstrap_connections::target_connections (std::size_t pulls_remaining, std::size_t attempts_count) const
|
||||
{
|
||||
auto const attempts_factor = nano::narrow_cast<unsigned> (node.config.bootstrap_connections * attempts_count);
|
||||
if (attempts_factor >= node.config.bootstrap_connections_max)
|
||||
|
@ -477,7 +477,7 @@ void nano::bootstrap_connections::stop ()
|
|||
lock.unlock ();
|
||||
condition.notify_all ();
|
||||
lock.lock ();
|
||||
for (auto i : clients)
|
||||
for (auto const & i : clients)
|
||||
{
|
||||
if (auto client = i.lock ())
|
||||
{
|
||||
|
|
|
@ -23,7 +23,6 @@ class bootstrap_client final : public std::enable_shared_from_this<bootstrap_cli
|
|||
public:
|
||||
bootstrap_client (std::shared_ptr<nano::node> const & node_a, nano::bootstrap_connections & connections_a, std::shared_ptr<nano::transport::channel_tcp> const & channel_a, std::shared_ptr<nano::socket> const & socket_a);
|
||||
~bootstrap_client ();
|
||||
std::shared_ptr<nano::bootstrap_client> shared ();
|
||||
void stop (bool force);
|
||||
double sample_block_rate ();
|
||||
double elapsed_seconds () const;
|
||||
|
@ -46,14 +45,13 @@ private:
|
|||
class bootstrap_connections final : public std::enable_shared_from_this<bootstrap_connections>
|
||||
{
|
||||
public:
|
||||
bootstrap_connections (nano::node & node_a);
|
||||
std::shared_ptr<nano::bootstrap_connections> shared ();
|
||||
explicit bootstrap_connections (nano::node & node_a);
|
||||
std::shared_ptr<nano::bootstrap_client> connection (std::shared_ptr<nano::bootstrap_attempt> const & attempt_a = nullptr, bool use_front_connection = false);
|
||||
void pool_connection (std::shared_ptr<nano::bootstrap_client> const & client_a, bool new_client = false, bool push_front = false);
|
||||
void add_connection (nano::endpoint const & endpoint_a);
|
||||
std::shared_ptr<nano::bootstrap_client> find_connection (nano::tcp_endpoint const & endpoint_a);
|
||||
void connect_client (nano::tcp_endpoint const & endpoint_a, bool push_front = false);
|
||||
unsigned target_connections (std::size_t pulls_remaining, std::size_t attempts_count);
|
||||
unsigned target_connections (std::size_t pulls_remaining, std::size_t attempts_count) const;
|
||||
void populate_connections (bool repeat = true);
|
||||
void start_populate_connections ();
|
||||
void add_pull (nano::pull_info const & pull_a);
|
||||
|
|
|
@ -14,6 +14,26 @@
|
|||
#include <iterator>
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
|
||||
namespace
|
||||
{
|
||||
bool is_temporary_error (boost::system::error_code const & ec_a)
|
||||
{
|
||||
switch (ec_a.value ())
|
||||
{
|
||||
#if EAGAIN != EWOULDBLOCK
|
||||
case EAGAIN:
|
||||
#endif
|
||||
|
||||
case EWOULDBLOCK:
|
||||
case EINTR:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
nano::socket::socket (nano::node & node_a) :
|
||||
strand{ node_a.io_ctx.get_executor () },
|
||||
|
@ -39,10 +59,10 @@ void nano::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::fu
|
|||
start_timer ();
|
||||
this_l->tcp_socket.async_connect (endpoint_a,
|
||||
boost::asio::bind_executor (this_l->strand,
|
||||
[this_l, callback_a, endpoint_a] (boost::system::error_code const & ec) {
|
||||
[this_l, callback = std::move (callback_a), endpoint_a] (boost::system::error_code const & ec) {
|
||||
this_l->stop_timer ();
|
||||
this_l->remote = endpoint_a;
|
||||
callback_a (ec);
|
||||
callback (ec);
|
||||
}));
|
||||
}
|
||||
|
||||
|
@ -54,14 +74,14 @@ void nano::socket::async_read (std::shared_ptr<std::vector<uint8_t>> const & buf
|
|||
if (!closed)
|
||||
{
|
||||
start_timer ();
|
||||
boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback_a, size_a, this_l] () {
|
||||
boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback = std::move (callback_a), size_a, this_l] () mutable {
|
||||
boost::asio::async_read (this_l->tcp_socket, boost::asio::buffer (buffer_a->data (), size_a),
|
||||
boost::asio::bind_executor (this_l->strand,
|
||||
[this_l, buffer_a, callback_a] (boost::system::error_code const & ec, std::size_t size_a) {
|
||||
[this_l, buffer_a, cbk = std::move (callback)] (boost::system::error_code const & ec, std::size_t size_a) {
|
||||
this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::in, size_a);
|
||||
this_l->stop_timer ();
|
||||
this_l->update_last_receive_time ();
|
||||
callback_a (ec, size_a);
|
||||
cbk (ec, size_a);
|
||||
}));
|
||||
}));
|
||||
}
|
||||
|
@ -74,40 +94,40 @@ void nano::socket::async_read (std::shared_ptr<std::vector<uint8_t>> const & buf
|
|||
}
|
||||
}
|
||||
|
||||
void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a)
|
||||
void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a)
|
||||
{
|
||||
if (!closed)
|
||||
{
|
||||
++queue_size;
|
||||
boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback_a, this_l = shared_from_this ()] () {
|
||||
boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback = std::move (callback_a), this_l = shared_from_this ()] () mutable {
|
||||
if (!this_l->closed)
|
||||
{
|
||||
this_l->start_timer ();
|
||||
nano::async_write (this_l->tcp_socket, buffer_a,
|
||||
boost::asio::bind_executor (this_l->strand,
|
||||
[buffer_a, callback_a, this_l] (boost::system::error_code ec, std::size_t size_a) {
|
||||
[buffer_a, cbk = std::move (callback), this_l] (boost::system::error_code ec, std::size_t size_a) {
|
||||
--this_l->queue_size;
|
||||
this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a);
|
||||
this_l->stop_timer ();
|
||||
if (callback_a)
|
||||
if (cbk)
|
||||
{
|
||||
callback_a (ec, size_a);
|
||||
cbk (ec, size_a);
|
||||
}
|
||||
}));
|
||||
}
|
||||
else
|
||||
else if (callback)
|
||||
{
|
||||
if (callback_a)
|
||||
{
|
||||
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
|
||||
}
|
||||
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
|
||||
}
|
||||
}));
|
||||
}
|
||||
else if (callback_a)
|
||||
else
|
||||
{
|
||||
node.background ([callback_a] () {
|
||||
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
|
||||
node.background ([callback = std::move (callback_a)] () {
|
||||
if (callback)
|
||||
{
|
||||
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -231,7 +251,7 @@ nano::tcp_endpoint nano::socket::local_endpoint () const
|
|||
nano::server_socket::server_socket (nano::node & node_a, boost::asio::ip::tcp::endpoint local_a, std::size_t max_connections_a) :
|
||||
socket{ node_a },
|
||||
acceptor{ node_a.io_ctx },
|
||||
local{ local_a },
|
||||
local{ std::move (local_a) },
|
||||
max_inbound_connections{ max_connections_a }
|
||||
{
|
||||
io_timeout = std::chrono::seconds::max ();
|
||||
|
@ -334,7 +354,7 @@ void nano::server_socket::on_connection (std::function<bool (std::shared_ptr<nan
|
|||
{
|
||||
auto this_l (std::static_pointer_cast<nano::server_socket> (shared_from_this ()));
|
||||
|
||||
boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l, callback_a] () {
|
||||
boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l, callback = std::move (callback_a)] () mutable {
|
||||
if (!this_l->acceptor.is_open ())
|
||||
{
|
||||
this_l->node.logger.always_log ("Network: Acceptor is not open");
|
||||
|
@ -345,14 +365,14 @@ void nano::server_socket::on_connection (std::function<bool (std::shared_ptr<nan
|
|||
auto new_connection = std::make_shared<nano::socket> (this_l->node);
|
||||
this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote,
|
||||
boost::asio::bind_executor (this_l->strand,
|
||||
[this_l, new_connection, callback_a] (boost::system::error_code const & ec_a) {
|
||||
[this_l, new_connection, cbk = std::move (callback)] (boost::system::error_code const & ec_a) mutable {
|
||||
this_l->evict_dead_connections ();
|
||||
|
||||
if (this_l->connections_per_address.size () >= this_l->max_inbound_connections)
|
||||
{
|
||||
this_l->node.logger.try_log ("Network: max_inbound_connections reached, unable to open new connection");
|
||||
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in);
|
||||
this_l->on_connection_requeue_delayed (callback_a);
|
||||
this_l->on_connection_requeue_delayed (std::move (cbk));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -364,7 +384,7 @@ void nano::server_socket::on_connection (std::function<bool (std::shared_ptr<nan
|
|||
% remote_ip_address.to_string ());
|
||||
this_l->node.logger.try_log (log_message);
|
||||
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::in);
|
||||
this_l->on_connection_requeue_delayed (callback_a);
|
||||
this_l->on_connection_requeue_delayed (std::move (cbk));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -379,7 +399,7 @@ void nano::server_socket::on_connection (std::function<bool (std::shared_ptr<nan
|
|||
% remote_ip_address.to_string ());
|
||||
this_l->node.logger.try_log (log_message);
|
||||
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::in);
|
||||
this_l->on_connection_requeue_delayed (callback_a);
|
||||
this_l->on_connection_requeue_delayed (std::move (cbk));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -391,9 +411,9 @@ void nano::server_socket::on_connection (std::function<bool (std::shared_ptr<nan
|
|||
new_connection->start_timer (this_l->node.network_params.network.is_dev_network () ? std::chrono::seconds (2) : this_l->node.network_params.network.idle_timeout);
|
||||
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
|
||||
this_l->connections_per_address.emplace (new_connection->remote.address (), new_connection);
|
||||
if (callback_a (new_connection, ec_a))
|
||||
if (cbk (new_connection, ec_a))
|
||||
{
|
||||
this_l->on_connection (callback_a);
|
||||
this_l->on_connection (std::move (cbk));
|
||||
return;
|
||||
}
|
||||
this_l->node.logger.always_log ("Network: Stopping to accept connections");
|
||||
|
@ -404,17 +424,17 @@ void nano::server_socket::on_connection (std::function<bool (std::shared_ptr<nan
|
|||
this_l->node.logger.try_log ("Network: Unable to accept connection: ", ec_a.message ());
|
||||
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in);
|
||||
|
||||
if (this_l->is_temporary_error (ec_a))
|
||||
if (is_temporary_error (ec_a))
|
||||
{
|
||||
// if it is a temporary error, just retry it
|
||||
this_l->on_connection_requeue_delayed (callback_a);
|
||||
this_l->on_connection_requeue_delayed (std::move (cbk));
|
||||
return;
|
||||
}
|
||||
|
||||
// if it is not a temporary error, check how the listener wants to handle this error
|
||||
if (callback_a (new_connection, ec_a))
|
||||
if (cbk (new_connection, ec_a))
|
||||
{
|
||||
this_l->on_connection_requeue_delayed (callback_a);
|
||||
this_l->on_connection_requeue_delayed (std::move (cbk));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -430,26 +450,11 @@ void nano::server_socket::on_connection (std::function<bool (std::shared_ptr<nan
|
|||
void nano::server_socket::on_connection_requeue_delayed (std::function<bool (std::shared_ptr<nano::socket> const &, boost::system::error_code const &)> callback_a)
|
||||
{
|
||||
auto this_l (std::static_pointer_cast<nano::server_socket> (shared_from_this ()));
|
||||
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (1), [this_l, callback_a] () {
|
||||
this_l->on_connection (callback_a);
|
||||
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (1), [this_l, callback = std::move (callback_a)] () mutable {
|
||||
this_l->on_connection (std::move (callback));
|
||||
});
|
||||
}
|
||||
|
||||
bool nano::server_socket::is_temporary_error (boost::system::error_code const ec_a)
|
||||
{
|
||||
switch (ec_a.value ())
|
||||
{
|
||||
#if EAGAIN != EWOULDBLOCK
|
||||
case EAGAIN:
|
||||
#endif
|
||||
case EWOULDBLOCK:
|
||||
case EINTR:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// This must be called from a strand
|
||||
void nano::server_socket::evict_dead_connections ()
|
||||
{
|
||||
|
|
|
@ -62,7 +62,7 @@ public:
|
|||
virtual ~socket ();
|
||||
void async_connect (boost::asio::ip::tcp::endpoint const &, std::function<void (boost::system::error_code const &)>);
|
||||
void async_read (std::shared_ptr<std::vector<uint8_t>> const &, std::size_t, std::function<void (boost::system::error_code const &, std::size_t)>);
|
||||
void async_write (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr);
|
||||
void async_write (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> = {});
|
||||
|
||||
void close ();
|
||||
boost::asio::ip::tcp::endpoint remote_endpoint () const;
|
||||
|
@ -177,7 +177,6 @@ private:
|
|||
boost::asio::ip::tcp::endpoint local;
|
||||
std::size_t max_inbound_connections;
|
||||
void evict_dead_connections ();
|
||||
bool is_temporary_error (boost::system::error_code const ec_a);
|
||||
void on_connection_requeue_delayed (std::function<bool (std::shared_ptr<nano::socket> const & new_connection, boost::system::error_code const &)>);
|
||||
/** Checks whether the maximum number of connections per IP was reached. If so, it returns true. */
|
||||
bool limit_reached_for_incoming_ip_connections (std::shared_ptr<nano::socket> const & new_connection);
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
|
||||
nano::transport::channel_tcp::channel_tcp (nano::node & node_a, std::weak_ptr<nano::socket> socket_a) :
|
||||
channel (node_a),
|
||||
socket (socket_a)
|
||||
socket (std::move (socket_a))
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -107,7 +107,7 @@ void nano::transport::channel_tcp::set_endpoint ()
|
|||
|
||||
nano::transport::tcp_channels::tcp_channels (nano::node & node, std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> sink) :
|
||||
node{ node },
|
||||
sink{ sink }
|
||||
sink{ std::move (sink) }
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -228,10 +228,9 @@ bool nano::transport::tcp_channels::store_all (bool clear_peers)
|
|||
{
|
||||
node.store.peer.clear (transaction);
|
||||
}
|
||||
for (auto endpoint : endpoints)
|
||||
for (auto const & endpoint : endpoints)
|
||||
{
|
||||
nano::endpoint_key endpoint_key (endpoint.address ().to_v6 ().to_bytes (), endpoint.port ());
|
||||
node.store.peer.put (transaction, std::move (endpoint_key));
|
||||
node.store.peer.put (transaction, nano::endpoint_key{ endpoint.address ().to_v6 ().to_bytes (), endpoint.port () });
|
||||
}
|
||||
result = true;
|
||||
}
|
||||
|
@ -345,16 +344,16 @@ void nano::transport::tcp_channels::stop ()
|
|||
stopped = true;
|
||||
nano::unique_lock<nano::mutex> lock (mutex);
|
||||
// Close all TCP sockets
|
||||
for (auto i (channels.begin ()), j (channels.end ()); i != j; ++i)
|
||||
for (auto const & channel : channels)
|
||||
{
|
||||
if (i->socket)
|
||||
if (channel.socket)
|
||||
{
|
||||
i->socket->close ();
|
||||
channel.socket->close ();
|
||||
}
|
||||
// Remove response server
|
||||
if (i->response_server)
|
||||
if (channel.response_server)
|
||||
{
|
||||
i->response_server->stop ();
|
||||
channel.response_server->stop ();
|
||||
}
|
||||
}
|
||||
channels.clear ();
|
||||
|
@ -498,16 +497,6 @@ void nano::transport::tcp_channels::ongoing_keepalive ()
|
|||
});
|
||||
}
|
||||
|
||||
void nano::transport::tcp_channels::list_below_version (std::vector<std::shared_ptr<nano::transport::channel>> & channels_a, uint8_t cutoff_version_a)
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock (mutex);
|
||||
// clang-format off
|
||||
nano::transform_if (channels.get<random_access_tag> ().begin (), channels.get<random_access_tag> ().end (), std::back_inserter (channels_a),
|
||||
[cutoff_version_a](auto & channel_a) { return channel_a.channel->get_network_version () < cutoff_version_a; },
|
||||
[](auto const & channel) { return channel.channel; });
|
||||
// clang-format on
|
||||
}
|
||||
|
||||
void nano::transport::tcp_channels::list (std::deque<std::shared_ptr<nano::transport::channel>> & deque_a, uint8_t minimum_version_a, bool include_temporary_channels_a)
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock (mutex);
|
||||
|
@ -524,8 +513,8 @@ void nano::transport::tcp_channels::modify (std::shared_ptr<nano::transport::cha
|
|||
auto existing (channels.get<endpoint_tag> ().find (channel_a->get_tcp_endpoint ()));
|
||||
if (existing != channels.get<endpoint_tag> ().end ())
|
||||
{
|
||||
channels.get<endpoint_tag> ().modify (existing, [modify_callback_a] (channel_tcp_wrapper & wrapper_a) {
|
||||
modify_callback_a (wrapper_a.channel);
|
||||
channels.get<endpoint_tag> ().modify (existing, [modify_callback = std::move (modify_callback_a)] (channel_tcp_wrapper & wrapper_a) {
|
||||
modify_callback (wrapper_a.channel);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -679,7 +668,7 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
|
|||
|
||||
if (!node_l->flags.disable_initial_telemetry_requests)
|
||||
{
|
||||
node_l->telemetry->get_metrics_single_peer_async (channel_a, [] (nano::telemetry_data_response /* unused */) {
|
||||
node_l->telemetry->get_metrics_single_peer_async (channel_a, [] (nano::telemetry_data_response const &) {
|
||||
// Intentionally empty, starts the telemetry request cycle to more quickly disconnect from invalid peers
|
||||
});
|
||||
}
|
||||
|
|
|
@ -34,9 +34,11 @@ namespace transport
|
|||
|
||||
public:
|
||||
channel_tcp (nano::node &, std::weak_ptr<nano::socket>);
|
||||
~channel_tcp ();
|
||||
~channel_tcp () override;
|
||||
std::size_t hash_code () const override;
|
||||
bool operator== (nano::transport::channel const &) const override;
|
||||
// TODO: investigate clang-tidy warning about default parameters on virtual/override functions
|
||||
//
|
||||
void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter) override;
|
||||
std::string to_string () const override;
|
||||
bool operator== (nano::transport::channel_tcp const & other_a) const
|
||||
|
@ -75,7 +77,7 @@ namespace transport
|
|||
friend class telemetry_simultaneous_requests_Test;
|
||||
|
||||
public:
|
||||
tcp_channels (nano::node &, std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> = nullptr);
|
||||
explicit tcp_channels (nano::node &, std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> = nullptr);
|
||||
bool insert (std::shared_ptr<nano::transport::channel_tcp> const &, std::shared_ptr<nano::socket> const &, std::shared_ptr<nano::bootstrap_server> const &);
|
||||
void erase (nano::tcp_endpoint const &);
|
||||
std::size_t size () const;
|
||||
|
@ -99,7 +101,6 @@ namespace transport
|
|||
std::unique_ptr<container_info_component> collect_container_info (std::string const &);
|
||||
void purge (std::chrono::steady_clock::time_point const &);
|
||||
void ongoing_keepalive ();
|
||||
void list_below_version (std::vector<std::shared_ptr<nano::transport::channel>> &, uint8_t);
|
||||
void list (std::deque<std::shared_ptr<nano::transport::channel>> &, uint8_t = 0, bool = true);
|
||||
void modify (std::shared_ptr<nano::transport::channel_tcp> const &, std::function<void (std::shared_ptr<nano::transport::channel_tcp> const &)>);
|
||||
void update (nano::tcp_endpoint const &);
|
||||
|
@ -145,8 +146,8 @@ namespace transport
|
|||
std::shared_ptr<nano::transport::channel_tcp> channel;
|
||||
std::shared_ptr<nano::socket> socket;
|
||||
std::shared_ptr<nano::bootstrap_server> response_server;
|
||||
channel_tcp_wrapper (std::shared_ptr<nano::transport::channel_tcp> const & channel_a, std::shared_ptr<nano::socket> const & socket_a, std::shared_ptr<nano::bootstrap_server> const & server_a) :
|
||||
channel (channel_a), socket (socket_a), response_server (server_a)
|
||||
channel_tcp_wrapper (std::shared_ptr<nano::transport::channel_tcp> channel_a, std::shared_ptr<nano::socket> socket_a, std::shared_ptr<nano::bootstrap_server> server_a) :
|
||||
channel (std::move (channel_a)), socket (std::move (socket_a)), response_server (std::move (server_a))
|
||||
{
|
||||
}
|
||||
nano::tcp_endpoint endpoint () const
|
||||
|
|
|
@ -14,9 +14,9 @@ class bandwidth_limiter final
|
|||
{
|
||||
public:
|
||||
// initialize with limit 0 = unbounded
|
||||
bandwidth_limiter (double const, std::size_t const);
|
||||
bandwidth_limiter (double, std::size_t);
|
||||
bool should_drop (std::size_t const &);
|
||||
void reset (double const, std::size_t const);
|
||||
void reset (double, std::size_t);
|
||||
|
||||
private:
|
||||
nano::rate::token_bucket bucket;
|
||||
|
@ -24,7 +24,6 @@ private:
|
|||
|
||||
namespace transport
|
||||
{
|
||||
class message;
|
||||
nano::endpoint map_endpoint_to_v6 (nano::endpoint const &);
|
||||
nano::endpoint map_tcp_to_endpoint (nano::tcp_endpoint const &);
|
||||
nano::tcp_endpoint map_endpoint_to_tcp (nano::endpoint const &);
|
||||
|
@ -47,11 +46,13 @@ namespace transport
|
|||
class channel
|
||||
{
|
||||
public:
|
||||
channel (nano::node &);
|
||||
explicit channel (nano::node &);
|
||||
virtual ~channel () = default;
|
||||
virtual std::size_t hash_code () const = 0;
|
||||
virtual bool operator== (nano::transport::channel const &) const = 0;
|
||||
void send (nano::message & message_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a = nullptr, nano::buffer_drop_policy policy_a = nano::buffer_drop_policy::limiter);
|
||||
// TODO: investigate clang-tidy warning about default parameters on virtual/override functions
|
||||
//
|
||||
virtual void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter) = 0;
|
||||
virtual std::string to_string () const = 0;
|
||||
virtual nano::endpoint get_endpoint () const = 0;
|
||||
|
@ -145,9 +146,11 @@ namespace transport
|
|||
class channel_loopback final : public nano::transport::channel
|
||||
{
|
||||
public:
|
||||
channel_loopback (nano::node &);
|
||||
explicit channel_loopback (nano::node &);
|
||||
std::size_t hash_code () const override;
|
||||
bool operator== (nano::transport::channel const &) const override;
|
||||
// TODO: investigate clang-tidy warning about default parameters on virtual/override functions
|
||||
//
|
||||
void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter) override;
|
||||
std::string to_string () const override;
|
||||
bool operator== (nano::transport::channel_loopback const & other_a) const
|
||||
|
|
|
@ -111,7 +111,7 @@ std::shared_ptr<nano::transport::channel_udp> nano::transport::udp_channels::ins
|
|||
else
|
||||
{
|
||||
result = std::make_shared<nano::transport::channel_udp> (*this, endpoint_a, network_version_a);
|
||||
channels.get<endpoint_tag> ().insert (result);
|
||||
channels.get<endpoint_tag> ().insert (channel_udp_wrapper{ result });
|
||||
attempts.get<endpoint_tag> ().erase (endpoint_a);
|
||||
lock.unlock ();
|
||||
node.network.channel_observer (result);
|
||||
|
@ -733,16 +733,6 @@ void nano::transport::udp_channels::ongoing_keepalive ()
|
|||
});
|
||||
}
|
||||
|
||||
void nano::transport::udp_channels::list_below_version (std::vector<std::shared_ptr<nano::transport::channel>> & channels_a, uint8_t cutoff_version_a)
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock (mutex);
|
||||
// clang-format off
|
||||
nano::transform_if (channels.get<random_access_tag> ().begin (), channels.get<random_access_tag> ().end (), std::back_inserter (channels_a),
|
||||
[cutoff_version_a](auto & channel_a) { return channel_a.channel->get_network_version () < cutoff_version_a; },
|
||||
[](auto const & channel) { return channel.channel; });
|
||||
// clang-format on
|
||||
}
|
||||
|
||||
void nano::transport::udp_channels::list (std::deque<std::shared_ptr<nano::transport::channel>> & deque_a, uint8_t minimum_version_a)
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock (mutex);
|
||||
|
|
|
@ -29,6 +29,8 @@ namespace transport
|
|||
channel_udp (nano::transport::udp_channels &, nano::endpoint const &, uint8_t protocol_version);
|
||||
std::size_t hash_code () const override;
|
||||
bool operator== (nano::transport::channel const &) const override;
|
||||
// TODO: investigate clang-tidy warning about default parameters on virtual/override functions
|
||||
//
|
||||
void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter) override;
|
||||
std::string to_string () const override;
|
||||
bool operator== (nano::transport::channel_udp const & other_a) const
|
||||
|
@ -104,7 +106,6 @@ namespace transport
|
|||
std::unique_ptr<container_info_component> collect_container_info (std::string const &);
|
||||
void purge (std::chrono::steady_clock::time_point const &);
|
||||
void ongoing_keepalive ();
|
||||
void list_below_version (std::vector<std::shared_ptr<nano::transport::channel>> &, uint8_t);
|
||||
void list (std::deque<std::shared_ptr<nano::transport::channel>> &, uint8_t = 0);
|
||||
void modify (std::shared_ptr<nano::transport::channel_udp> const &, std::function<void (std::shared_ptr<nano::transport::channel_udp> const &)>);
|
||||
nano::node & node;
|
||||
|
@ -140,8 +141,8 @@ namespace transport
|
|||
{
|
||||
public:
|
||||
std::shared_ptr<nano::transport::channel_udp> channel;
|
||||
channel_udp_wrapper (std::shared_ptr<nano::transport::channel_udp> const & channel_a) :
|
||||
channel (channel_a)
|
||||
explicit channel_udp_wrapper (std::shared_ptr<nano::transport::channel_udp> channel_a) :
|
||||
channel (std::move (channel_a))
|
||||
{
|
||||
}
|
||||
nano::endpoint endpoint () const
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue