Separate source of tcp_channels and channel_tcp
This commit is contained in:
parent
77a314222a
commit
0906a4f33c
17 changed files with 373 additions and 369 deletions
|
|
@ -1,5 +1,4 @@
|
|||
#include <nano/node/transport/socket.hpp>
|
||||
#include <nano/node/transport/tcp.hpp>
|
||||
#include <nano/node/transport/tcp_server.hpp>
|
||||
#include <nano/test_common/network.hpp>
|
||||
#include <nano/test_common/system.hpp>
|
||||
|
|
|
|||
|
|
@ -157,6 +157,8 @@ add_library(
|
|||
telemetry.cpp
|
||||
transport/channel.hpp
|
||||
transport/channel.cpp
|
||||
transport/channel_tcp.hpp
|
||||
transport/channel_tcp.cpp
|
||||
transport/fake.hpp
|
||||
transport/fake.cpp
|
||||
transport/inproc.hpp
|
||||
|
|
@ -165,8 +167,8 @@ add_library(
|
|||
transport/message_deserializer.cpp
|
||||
transport/socket.hpp
|
||||
transport/socket.cpp
|
||||
transport/tcp.hpp
|
||||
transport/tcp.cpp
|
||||
transport/tcp_channels.hpp
|
||||
transport/tcp_channels.cpp
|
||||
transport/tcp_listener.hpp
|
||||
transport/tcp_listener.cpp
|
||||
transport/tcp_server.hpp
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@
|
|||
#include <nano/node/bootstrap/bootstrap_connections.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_lazy.hpp>
|
||||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/transport/tcp.hpp>
|
||||
#include <nano/secure/ledger.hpp>
|
||||
#include <nano/secure/ledger_set_any.hpp>
|
||||
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@
|
|||
#include <nano/node/bootstrap/bootstrap_bulk_push.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_legacy.hpp>
|
||||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/transport/tcp.hpp>
|
||||
|
||||
#include <boost/format.hpp>
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@
|
|||
#include <nano/node/bootstrap/bootstrap_lazy.hpp>
|
||||
#include <nano/node/common.hpp>
|
||||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/transport/tcp.hpp>
|
||||
|
||||
#include <boost/format.hpp>
|
||||
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@
|
|||
#include <nano/node/bootstrap/bootstrap_frontier.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_legacy.hpp>
|
||||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/transport/tcp.hpp>
|
||||
#include <nano/secure/ledger.hpp>
|
||||
#include <nano/secure/ledger_set_any.hpp>
|
||||
|
||||
|
|
|
|||
|
|
@ -2,8 +2,11 @@
|
|||
|
||||
#include <nano/lib/logging.hpp>
|
||||
#include <nano/node/common.hpp>
|
||||
#include <nano/node/messages.hpp>
|
||||
#include <nano/node/peer_exclusion.hpp>
|
||||
#include <nano/node/transport/tcp.hpp>
|
||||
#include <nano/node/transport/common.hpp>
|
||||
#include <nano/node/transport/fwd.hpp>
|
||||
#include <nano/node/transport/tcp_channels.hpp>
|
||||
#include <nano/secure/network_filter.hpp>
|
||||
|
||||
#include <deque>
|
||||
|
|
|
|||
101
nano/node/transport/channel_tcp.cpp
Normal file
101
nano/node/transport/channel_tcp.cpp
Normal file
|
|
@ -0,0 +1,101 @@
|
|||
#include <nano/lib/stats.hpp>
|
||||
#include <nano/lib/utility.hpp>
|
||||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/transport/channel_tcp.hpp>
|
||||
#include <nano/node/transport/message_deserializer.hpp>
|
||||
|
||||
/*
|
||||
* channel_tcp
|
||||
*/
|
||||
|
||||
nano::transport::channel_tcp::channel_tcp (nano::node & node_a, std::weak_ptr<nano::transport::socket> socket_a) :
|
||||
channel (node_a),
|
||||
socket (std::move (socket_a))
|
||||
{
|
||||
}
|
||||
|
||||
nano::transport::channel_tcp::~channel_tcp ()
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lk{ channel_mutex };
|
||||
// Close socket. Exception: socket is used by tcp_server
|
||||
if (auto socket_l = socket.lock ())
|
||||
{
|
||||
socket_l->close ();
|
||||
}
|
||||
}
|
||||
|
||||
void nano::transport::channel_tcp::update_endpoints ()
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lk (channel_mutex);
|
||||
|
||||
debug_assert (endpoint == nano::endpoint{}); // Not initialized endpoint value
|
||||
debug_assert (local_endpoint == nano::endpoint{}); // Not initialized endpoint value
|
||||
|
||||
if (auto socket_l = socket.lock ())
|
||||
{
|
||||
endpoint = socket_l->remote_endpoint ();
|
||||
local_endpoint = socket_l->local_endpoint ();
|
||||
}
|
||||
}
|
||||
|
||||
void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy policy_a, nano::transport::traffic_type traffic_type)
|
||||
{
|
||||
if (auto socket_l = socket.lock ())
|
||||
{
|
||||
if (!socket_l->max (traffic_type) || (policy_a == nano::transport::buffer_drop_policy::no_socket_drop && !socket_l->full (traffic_type)))
|
||||
{
|
||||
socket_l->async_write (
|
||||
buffer_a, [this_s = shared_from_this (), endpoint_a = socket_l->remote_endpoint (), node = std::weak_ptr<nano::node>{ node.shared () }, callback_a] (boost::system::error_code const & ec, std::size_t size_a) {
|
||||
if (auto node_l = node.lock ())
|
||||
{
|
||||
if (!ec)
|
||||
{
|
||||
this_s->set_last_packet_sent (std::chrono::steady_clock::now ());
|
||||
}
|
||||
if (ec == boost::system::errc::host_unreachable)
|
||||
{
|
||||
node_l->stats.inc (nano::stat::type::error, nano::stat::detail::unreachable_host, nano::stat::dir::out);
|
||||
}
|
||||
if (callback_a)
|
||||
{
|
||||
callback_a (ec, size_a);
|
||||
}
|
||||
}
|
||||
},
|
||||
traffic_type);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (policy_a == nano::transport::buffer_drop_policy::no_socket_drop)
|
||||
{
|
||||
node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out);
|
||||
}
|
||||
else
|
||||
{
|
||||
node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out);
|
||||
}
|
||||
if (callback_a)
|
||||
{
|
||||
callback_a (boost::system::errc::make_error_code (boost::system::errc::no_buffer_space), 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (callback_a)
|
||||
{
|
||||
node.background ([callback_a] () {
|
||||
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
std::string nano::transport::channel_tcp::to_string () const
|
||||
{
|
||||
return nano::util::to_str (get_tcp_endpoint ());
|
||||
}
|
||||
|
||||
void nano::transport::channel_tcp::operator() (nano::object_stream & obs) const
|
||||
{
|
||||
nano::transport::channel::operator() (obs); // Write common data
|
||||
|
||||
obs.write ("socket", socket);
|
||||
}
|
||||
86
nano/node/transport/channel_tcp.hpp
Normal file
86
nano/node/transport/channel_tcp.hpp
Normal file
|
|
@ -0,0 +1,86 @@
|
|||
#pragma once
|
||||
|
||||
#include <nano/node/transport/channel.hpp>
|
||||
#include <nano/node/transport/transport.hpp>
|
||||
|
||||
namespace nano::transport
|
||||
{
|
||||
class tcp_server;
|
||||
class tcp_channels;
|
||||
class channel_tcp;
|
||||
|
||||
class channel_tcp : public nano::transport::channel, public std::enable_shared_from_this<channel_tcp>
|
||||
{
|
||||
friend class nano::transport::tcp_channels;
|
||||
|
||||
public:
|
||||
channel_tcp (nano::node &, std::weak_ptr<nano::transport::socket>);
|
||||
~channel_tcp () override;
|
||||
|
||||
void update_endpoints ();
|
||||
|
||||
// TODO: investigate clang-tidy warning about default parameters on virtual/override functions//
|
||||
void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) override;
|
||||
|
||||
std::string to_string () const override;
|
||||
|
||||
nano::endpoint get_endpoint () const override
|
||||
{
|
||||
return nano::transport::map_tcp_to_endpoint (get_tcp_endpoint ());
|
||||
}
|
||||
|
||||
nano::tcp_endpoint get_tcp_endpoint () const override
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lk (channel_mutex);
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
nano::endpoint get_local_endpoint () const override
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lk (channel_mutex);
|
||||
return local_endpoint;
|
||||
}
|
||||
|
||||
nano::transport::transport_type get_type () const override
|
||||
{
|
||||
return nano::transport::transport_type::tcp;
|
||||
}
|
||||
|
||||
bool max (nano::transport::traffic_type traffic_type) override
|
||||
{
|
||||
bool result = true;
|
||||
if (auto socket_l = socket.lock ())
|
||||
{
|
||||
result = socket_l->max (traffic_type);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
bool alive () const override
|
||||
{
|
||||
if (auto socket_l = socket.lock ())
|
||||
{
|
||||
return socket_l->alive ();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void close () override
|
||||
{
|
||||
if (auto socket_l = socket.lock ())
|
||||
{
|
||||
socket_l->close ();
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
std::weak_ptr<nano::transport::socket> socket;
|
||||
|
||||
private:
|
||||
nano::endpoint endpoint;
|
||||
nano::endpoint local_endpoint;
|
||||
|
||||
public: // Logging
|
||||
void operator() (nano::object_stream &) const override;
|
||||
};
|
||||
}
|
||||
|
|
@ -3,4 +3,6 @@
|
|||
namespace nano::transport
|
||||
{
|
||||
class channel;
|
||||
class channel_tcp;
|
||||
class tcp_channels;
|
||||
}
|
||||
|
|
@ -1,254 +0,0 @@
|
|||
#pragma once
|
||||
|
||||
#include <nano/lib/random.hpp>
|
||||
#include <nano/node/common.hpp>
|
||||
#include <nano/node/transport/channel.hpp>
|
||||
#include <nano/node/transport/transport.hpp>
|
||||
|
||||
#include <boost/multi_index/hashed_index.hpp>
|
||||
#include <boost/multi_index/mem_fun.hpp>
|
||||
#include <boost/multi_index/member.hpp>
|
||||
#include <boost/multi_index/ordered_index.hpp>
|
||||
#include <boost/multi_index/random_access_index.hpp>
|
||||
#include <boost/multi_index_container.hpp>
|
||||
|
||||
#include <random>
|
||||
#include <thread>
|
||||
#include <unordered_set>
|
||||
|
||||
namespace mi = boost::multi_index;
|
||||
|
||||
namespace nano
|
||||
{
|
||||
namespace transport
|
||||
{
|
||||
class tcp_server;
|
||||
class tcp_channels;
|
||||
class channel_tcp;
|
||||
|
||||
class channel_tcp : public nano::transport::channel, public std::enable_shared_from_this<channel_tcp>
|
||||
{
|
||||
friend class nano::transport::tcp_channels;
|
||||
|
||||
public:
|
||||
channel_tcp (nano::node &, std::weak_ptr<nano::transport::socket>);
|
||||
~channel_tcp () override;
|
||||
|
||||
void update_endpoints ();
|
||||
|
||||
// TODO: investigate clang-tidy warning about default parameters on virtual/override functions//
|
||||
void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) override;
|
||||
|
||||
std::string to_string () const override;
|
||||
|
||||
nano::endpoint get_endpoint () const override
|
||||
{
|
||||
return nano::transport::map_tcp_to_endpoint (get_tcp_endpoint ());
|
||||
}
|
||||
|
||||
nano::tcp_endpoint get_tcp_endpoint () const override
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lk (channel_mutex);
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
nano::endpoint get_local_endpoint () const override
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lk (channel_mutex);
|
||||
return local_endpoint;
|
||||
}
|
||||
|
||||
nano::transport::transport_type get_type () const override
|
||||
{
|
||||
return nano::transport::transport_type::tcp;
|
||||
}
|
||||
|
||||
bool max (nano::transport::traffic_type traffic_type) override
|
||||
{
|
||||
bool result = true;
|
||||
if (auto socket_l = socket.lock ())
|
||||
{
|
||||
result = socket_l->max (traffic_type);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
bool alive () const override
|
||||
{
|
||||
if (auto socket_l = socket.lock ())
|
||||
{
|
||||
return socket_l->alive ();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void close () override
|
||||
{
|
||||
if (auto socket_l = socket.lock ())
|
||||
{
|
||||
socket_l->close ();
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
std::weak_ptr<nano::transport::socket> socket;
|
||||
|
||||
private:
|
||||
nano::endpoint endpoint;
|
||||
nano::endpoint local_endpoint;
|
||||
|
||||
public: // Logging
|
||||
void operator() (nano::object_stream &) const override;
|
||||
};
|
||||
|
||||
class tcp_channels final
|
||||
{
|
||||
friend class channel_tcp;
|
||||
friend class telemetry_simultaneous_requests_Test;
|
||||
friend class network_peer_max_tcp_attempts_subnetwork_Test;
|
||||
|
||||
public:
|
||||
explicit tcp_channels (nano::node &);
|
||||
~tcp_channels ();
|
||||
|
||||
void start ();
|
||||
void stop ();
|
||||
|
||||
std::shared_ptr<nano::transport::channel_tcp> create (std::shared_ptr<nano::transport::socket> const &, std::shared_ptr<nano::transport::tcp_server> const &, nano::account const & node_id);
|
||||
void erase (nano::tcp_endpoint const &);
|
||||
std::size_t size () const;
|
||||
std::shared_ptr<nano::transport::channel_tcp> find_channel (nano::tcp_endpoint const &) const;
|
||||
void random_fill (std::array<nano::endpoint, 8> &) const;
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (std::size_t, uint8_t = 0, bool = false) const;
|
||||
std::shared_ptr<nano::transport::channel_tcp> find_node_id (nano::account const &);
|
||||
// Get the next peer for attempting a tcp connection
|
||||
nano::tcp_endpoint bootstrap_peer ();
|
||||
bool max_ip_connections (nano::tcp_endpoint const & endpoint_a);
|
||||
bool max_subnetwork_connections (nano::tcp_endpoint const & endpoint_a);
|
||||
bool max_ip_or_subnetwork_connections (nano::tcp_endpoint const & endpoint_a);
|
||||
// Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt
|
||||
bool track_reachout (nano::endpoint const &);
|
||||
std::unique_ptr<container_info_component> collect_container_info (std::string const &);
|
||||
void purge (std::chrono::steady_clock::time_point cutoff_deadline);
|
||||
void list (std::deque<std::shared_ptr<nano::transport::channel>> &, uint8_t = 0, bool = true);
|
||||
void modify (std::shared_ptr<nano::transport::channel_tcp> const &, std::function<void (std::shared_ptr<nano::transport::channel_tcp> const &)>);
|
||||
void keepalive ();
|
||||
std::optional<nano::keepalive> sample_keepalive ();
|
||||
|
||||
// Connection start
|
||||
void start_tcp (nano::endpoint const &);
|
||||
|
||||
private: // Dependencies
|
||||
nano::node & node;
|
||||
|
||||
private:
|
||||
void close ();
|
||||
bool check (nano::tcp_endpoint const &, nano::account const & node_id) const;
|
||||
|
||||
private:
|
||||
class channel_entry final
|
||||
{
|
||||
public:
|
||||
std::shared_ptr<nano::transport::channel_tcp> channel;
|
||||
std::shared_ptr<nano::transport::socket> socket;
|
||||
std::shared_ptr<nano::transport::tcp_server> response_server;
|
||||
|
||||
public:
|
||||
channel_entry (std::shared_ptr<nano::transport::channel_tcp> channel_a, std::shared_ptr<nano::transport::socket> socket_a, std::shared_ptr<nano::transport::tcp_server> server_a) :
|
||||
channel (std::move (channel_a)), socket (std::move (socket_a)), response_server (std::move (server_a))
|
||||
{
|
||||
}
|
||||
nano::tcp_endpoint endpoint () const
|
||||
{
|
||||
return channel->get_tcp_endpoint ();
|
||||
}
|
||||
std::chrono::steady_clock::time_point last_bootstrap_attempt () const
|
||||
{
|
||||
return channel->get_last_bootstrap_attempt ();
|
||||
}
|
||||
boost::asio::ip::address ip_address () const
|
||||
{
|
||||
return nano::transport::ipv4_address_or_ipv6_subnet (endpoint ().address ());
|
||||
}
|
||||
boost::asio::ip::address subnetwork () const
|
||||
{
|
||||
return nano::transport::map_address_to_subnetwork (endpoint ().address ());
|
||||
}
|
||||
nano::account node_id () const
|
||||
{
|
||||
return channel->get_node_id ();
|
||||
}
|
||||
uint8_t network_version () const
|
||||
{
|
||||
return channel->get_network_version ();
|
||||
}
|
||||
};
|
||||
|
||||
class attempt_entry final
|
||||
{
|
||||
public:
|
||||
nano::tcp_endpoint endpoint;
|
||||
boost::asio::ip::address address;
|
||||
boost::asio::ip::address subnetwork;
|
||||
std::chrono::steady_clock::time_point last_attempt{ std::chrono::steady_clock::now () };
|
||||
|
||||
public:
|
||||
explicit attempt_entry (nano::tcp_endpoint const & endpoint_a) :
|
||||
endpoint (endpoint_a),
|
||||
address (nano::transport::ipv4_address_or_ipv6_subnet (endpoint_a.address ())),
|
||||
subnetwork (nano::transport::map_address_to_subnetwork (endpoint_a.address ()))
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
// clang-format off
|
||||
class endpoint_tag {};
|
||||
class ip_address_tag {};
|
||||
class subnetwork_tag {};
|
||||
class random_access_tag {};
|
||||
class last_bootstrap_attempt_tag {};
|
||||
class last_attempt_tag {};
|
||||
class node_id_tag {};
|
||||
class version_tag {};
|
||||
// clang-format on
|
||||
|
||||
// clang-format off
|
||||
boost::multi_index_container<channel_entry,
|
||||
mi::indexed_by<
|
||||
mi::random_access<mi::tag<random_access_tag>>,
|
||||
mi::ordered_non_unique<mi::tag<last_bootstrap_attempt_tag>,
|
||||
mi::const_mem_fun<channel_entry, std::chrono::steady_clock::time_point, &channel_entry::last_bootstrap_attempt>>,
|
||||
mi::hashed_unique<mi::tag<endpoint_tag>,
|
||||
mi::const_mem_fun<channel_entry, nano::tcp_endpoint, &channel_entry::endpoint>>,
|
||||
mi::hashed_non_unique<mi::tag<node_id_tag>,
|
||||
mi::const_mem_fun<channel_entry, nano::account, &channel_entry::node_id>>,
|
||||
mi::ordered_non_unique<mi::tag<version_tag>,
|
||||
mi::const_mem_fun<channel_entry, uint8_t, &channel_entry::network_version>>,
|
||||
mi::hashed_non_unique<mi::tag<ip_address_tag>,
|
||||
mi::const_mem_fun<channel_entry, boost::asio::ip::address, &channel_entry::ip_address>>,
|
||||
mi::hashed_non_unique<mi::tag<subnetwork_tag>,
|
||||
mi::const_mem_fun<channel_entry, boost::asio::ip::address, &channel_entry::subnetwork>>>>
|
||||
channels;
|
||||
|
||||
boost::multi_index_container<attempt_entry,
|
||||
mi::indexed_by<
|
||||
mi::hashed_unique<mi::tag<endpoint_tag>,
|
||||
mi::member<attempt_entry, nano::tcp_endpoint, &attempt_entry::endpoint>>,
|
||||
mi::hashed_non_unique<mi::tag<ip_address_tag>,
|
||||
mi::member<attempt_entry, boost::asio::ip::address, &attempt_entry::address>>,
|
||||
mi::hashed_non_unique<mi::tag<subnetwork_tag>,
|
||||
mi::member<attempt_entry, boost::asio::ip::address, &attempt_entry::subnetwork>>,
|
||||
mi::ordered_non_unique<mi::tag<last_attempt_tag>,
|
||||
mi::member<attempt_entry, std::chrono::steady_clock::time_point, &attempt_entry::last_attempt>>>>
|
||||
attempts;
|
||||
// clang-format on
|
||||
|
||||
private:
|
||||
std::atomic<bool> stopped{ false };
|
||||
nano::condition_variable condition;
|
||||
mutable nano::mutex mutex;
|
||||
|
||||
mutable nano::random_generator rng;
|
||||
};
|
||||
} // namespace transport
|
||||
} // namespace nano
|
||||
|
|
@ -1,107 +1,5 @@
|
|||
#include <nano/lib/config.hpp>
|
||||
#include <nano/lib/stats.hpp>
|
||||
#include <nano/lib/utility.hpp>
|
||||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/transport/message_deserializer.hpp>
|
||||
#include <nano/node/transport/tcp.hpp>
|
||||
|
||||
#include <boost/format.hpp>
|
||||
|
||||
/*
|
||||
* channel_tcp
|
||||
*/
|
||||
|
||||
nano::transport::channel_tcp::channel_tcp (nano::node & node_a, std::weak_ptr<nano::transport::socket> socket_a) :
|
||||
channel (node_a),
|
||||
socket (std::move (socket_a))
|
||||
{
|
||||
}
|
||||
|
||||
nano::transport::channel_tcp::~channel_tcp ()
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lk{ channel_mutex };
|
||||
// Close socket. Exception: socket is used by tcp_server
|
||||
if (auto socket_l = socket.lock ())
|
||||
{
|
||||
socket_l->close ();
|
||||
}
|
||||
}
|
||||
|
||||
void nano::transport::channel_tcp::update_endpoints ()
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lk (channel_mutex);
|
||||
|
||||
debug_assert (endpoint == nano::endpoint{}); // Not initialized endpoint value
|
||||
debug_assert (local_endpoint == nano::endpoint{}); // Not initialized endpoint value
|
||||
|
||||
if (auto socket_l = socket.lock ())
|
||||
{
|
||||
endpoint = socket_l->remote_endpoint ();
|
||||
local_endpoint = socket_l->local_endpoint ();
|
||||
}
|
||||
}
|
||||
|
||||
void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy policy_a, nano::transport::traffic_type traffic_type)
|
||||
{
|
||||
if (auto socket_l = socket.lock ())
|
||||
{
|
||||
if (!socket_l->max (traffic_type) || (policy_a == nano::transport::buffer_drop_policy::no_socket_drop && !socket_l->full (traffic_type)))
|
||||
{
|
||||
socket_l->async_write (
|
||||
buffer_a, [this_s = shared_from_this (), endpoint_a = socket_l->remote_endpoint (), node = std::weak_ptr<nano::node>{ node.shared () }, callback_a] (boost::system::error_code const & ec, std::size_t size_a) {
|
||||
if (auto node_l = node.lock ())
|
||||
{
|
||||
if (!ec)
|
||||
{
|
||||
this_s->set_last_packet_sent (std::chrono::steady_clock::now ());
|
||||
}
|
||||
if (ec == boost::system::errc::host_unreachable)
|
||||
{
|
||||
node_l->stats.inc (nano::stat::type::error, nano::stat::detail::unreachable_host, nano::stat::dir::out);
|
||||
}
|
||||
if (callback_a)
|
||||
{
|
||||
callback_a (ec, size_a);
|
||||
}
|
||||
}
|
||||
},
|
||||
traffic_type);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (policy_a == nano::transport::buffer_drop_policy::no_socket_drop)
|
||||
{
|
||||
node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out);
|
||||
}
|
||||
else
|
||||
{
|
||||
node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out);
|
||||
}
|
||||
if (callback_a)
|
||||
{
|
||||
callback_a (boost::system::errc::make_error_code (boost::system::errc::no_buffer_space), 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (callback_a)
|
||||
{
|
||||
node.background ([callback_a] () {
|
||||
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
std::string nano::transport::channel_tcp::to_string () const
|
||||
{
|
||||
return nano::util::to_str (get_tcp_endpoint ());
|
||||
}
|
||||
|
||||
void nano::transport::channel_tcp::operator() (nano::object_stream & obs) const
|
||||
{
|
||||
nano::transport::channel::operator() (obs); // Write common data
|
||||
|
||||
obs.write ("socket", socket);
|
||||
}
|
||||
#include <nano/node/transport/tcp_channels.hpp>
|
||||
|
||||
/*
|
||||
* tcp_channels
|
||||
|
|
@ -551,4 +449,4 @@ void nano::transport::tcp_channels::modify (std::shared_ptr<nano::transport::cha
|
|||
void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint)
|
||||
{
|
||||
node.tcp_listener.connect (endpoint.address (), endpoint.port ());
|
||||
}
|
||||
}
|
||||
173
nano/node/transport/tcp_channels.hpp
Normal file
173
nano/node/transport/tcp_channels.hpp
Normal file
|
|
@ -0,0 +1,173 @@
|
|||
#pragma once
|
||||
|
||||
#include <nano/lib/random.hpp>
|
||||
#include <nano/node/common.hpp>
|
||||
#include <nano/node/transport/channel.hpp>
|
||||
#include <nano/node/transport/channel_tcp.hpp>
|
||||
#include <nano/node/transport/transport.hpp>
|
||||
|
||||
#include <boost/multi_index/hashed_index.hpp>
|
||||
#include <boost/multi_index/mem_fun.hpp>
|
||||
#include <boost/multi_index/member.hpp>
|
||||
#include <boost/multi_index/ordered_index.hpp>
|
||||
#include <boost/multi_index/random_access_index.hpp>
|
||||
#include <boost/multi_index_container.hpp>
|
||||
|
||||
#include <random>
|
||||
#include <thread>
|
||||
#include <unordered_set>
|
||||
|
||||
namespace mi = boost::multi_index;
|
||||
|
||||
namespace nano::transport
|
||||
{
|
||||
class tcp_channels final
|
||||
{
|
||||
friend class channel_tcp;
|
||||
friend class telemetry_simultaneous_requests_Test;
|
||||
friend class network_peer_max_tcp_attempts_subnetwork_Test;
|
||||
|
||||
public:
|
||||
explicit tcp_channels (nano::node &);
|
||||
~tcp_channels ();
|
||||
|
||||
void start ();
|
||||
void stop ();
|
||||
|
||||
std::shared_ptr<nano::transport::channel_tcp> create (std::shared_ptr<nano::transport::socket> const &, std::shared_ptr<nano::transport::tcp_server> const &, nano::account const & node_id);
|
||||
void erase (nano::tcp_endpoint const &);
|
||||
std::size_t size () const;
|
||||
std::shared_ptr<nano::transport::channel_tcp> find_channel (nano::tcp_endpoint const &) const;
|
||||
void random_fill (std::array<nano::endpoint, 8> &) const;
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (std::size_t, uint8_t = 0, bool = false) const;
|
||||
std::shared_ptr<nano::transport::channel_tcp> find_node_id (nano::account const &);
|
||||
// Get the next peer for attempting a tcp connection
|
||||
nano::tcp_endpoint bootstrap_peer ();
|
||||
bool max_ip_connections (nano::tcp_endpoint const & endpoint_a);
|
||||
bool max_subnetwork_connections (nano::tcp_endpoint const & endpoint_a);
|
||||
bool max_ip_or_subnetwork_connections (nano::tcp_endpoint const & endpoint_a);
|
||||
// Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt
|
||||
bool track_reachout (nano::endpoint const &);
|
||||
std::unique_ptr<container_info_component> collect_container_info (std::string const &);
|
||||
void purge (std::chrono::steady_clock::time_point cutoff_deadline);
|
||||
void list (std::deque<std::shared_ptr<nano::transport::channel>> &, uint8_t = 0, bool = true);
|
||||
void modify (std::shared_ptr<nano::transport::channel_tcp> const &, std::function<void (std::shared_ptr<nano::transport::channel_tcp> const &)>);
|
||||
void keepalive ();
|
||||
std::optional<nano::keepalive> sample_keepalive ();
|
||||
|
||||
// Connection start
|
||||
void start_tcp (nano::endpoint const &);
|
||||
|
||||
private: // Dependencies
|
||||
nano::node & node;
|
||||
|
||||
private:
|
||||
void close ();
|
||||
bool check (nano::tcp_endpoint const &, nano::account const & node_id) const;
|
||||
|
||||
private:
|
||||
class channel_entry final
|
||||
{
|
||||
public:
|
||||
std::shared_ptr<nano::transport::channel_tcp> channel;
|
||||
std::shared_ptr<nano::transport::socket> socket;
|
||||
std::shared_ptr<nano::transport::tcp_server> response_server;
|
||||
|
||||
public:
|
||||
channel_entry (std::shared_ptr<nano::transport::channel_tcp> channel_a, std::shared_ptr<nano::transport::socket> socket_a, std::shared_ptr<nano::transport::tcp_server> server_a) :
|
||||
channel (std::move (channel_a)), socket (std::move (socket_a)), response_server (std::move (server_a))
|
||||
{
|
||||
}
|
||||
nano::tcp_endpoint endpoint () const
|
||||
{
|
||||
return channel->get_tcp_endpoint ();
|
||||
}
|
||||
std::chrono::steady_clock::time_point last_bootstrap_attempt () const
|
||||
{
|
||||
return channel->get_last_bootstrap_attempt ();
|
||||
}
|
||||
boost::asio::ip::address ip_address () const
|
||||
{
|
||||
return nano::transport::ipv4_address_or_ipv6_subnet (endpoint ().address ());
|
||||
}
|
||||
boost::asio::ip::address subnetwork () const
|
||||
{
|
||||
return nano::transport::map_address_to_subnetwork (endpoint ().address ());
|
||||
}
|
||||
nano::account node_id () const
|
||||
{
|
||||
return channel->get_node_id ();
|
||||
}
|
||||
uint8_t network_version () const
|
||||
{
|
||||
return channel->get_network_version ();
|
||||
}
|
||||
};
|
||||
|
||||
class attempt_entry final
|
||||
{
|
||||
public:
|
||||
nano::tcp_endpoint endpoint;
|
||||
boost::asio::ip::address address;
|
||||
boost::asio::ip::address subnetwork;
|
||||
std::chrono::steady_clock::time_point last_attempt{ std::chrono::steady_clock::now () };
|
||||
|
||||
public:
|
||||
explicit attempt_entry (nano::tcp_endpoint const & endpoint_a) :
|
||||
endpoint (endpoint_a),
|
||||
address (nano::transport::ipv4_address_or_ipv6_subnet (endpoint_a.address ())),
|
||||
subnetwork (nano::transport::map_address_to_subnetwork (endpoint_a.address ()))
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
// clang-format off
|
||||
class endpoint_tag {};
|
||||
class ip_address_tag {};
|
||||
class subnetwork_tag {};
|
||||
class random_access_tag {};
|
||||
class last_bootstrap_attempt_tag {};
|
||||
class last_attempt_tag {};
|
||||
class node_id_tag {};
|
||||
class version_tag {};
|
||||
// clang-format on
|
||||
|
||||
// clang-format off
|
||||
boost::multi_index_container<channel_entry,
|
||||
mi::indexed_by<
|
||||
mi::random_access<mi::tag<random_access_tag>>,
|
||||
mi::ordered_non_unique<mi::tag<last_bootstrap_attempt_tag>,
|
||||
mi::const_mem_fun<channel_entry, std::chrono::steady_clock::time_point, &channel_entry::last_bootstrap_attempt>>,
|
||||
mi::hashed_unique<mi::tag<endpoint_tag>,
|
||||
mi::const_mem_fun<channel_entry, nano::tcp_endpoint, &channel_entry::endpoint>>,
|
||||
mi::hashed_non_unique<mi::tag<node_id_tag>,
|
||||
mi::const_mem_fun<channel_entry, nano::account, &channel_entry::node_id>>,
|
||||
mi::ordered_non_unique<mi::tag<version_tag>,
|
||||
mi::const_mem_fun<channel_entry, uint8_t, &channel_entry::network_version>>,
|
||||
mi::hashed_non_unique<mi::tag<ip_address_tag>,
|
||||
mi::const_mem_fun<channel_entry, boost::asio::ip::address, &channel_entry::ip_address>>,
|
||||
mi::hashed_non_unique<mi::tag<subnetwork_tag>,
|
||||
mi::const_mem_fun<channel_entry, boost::asio::ip::address, &channel_entry::subnetwork>>>>
|
||||
channels;
|
||||
|
||||
boost::multi_index_container<attempt_entry,
|
||||
mi::indexed_by<
|
||||
mi::hashed_unique<mi::tag<endpoint_tag>,
|
||||
mi::member<attempt_entry, nano::tcp_endpoint, &attempt_entry::endpoint>>,
|
||||
mi::hashed_non_unique<mi::tag<ip_address_tag>,
|
||||
mi::member<attempt_entry, boost::asio::ip::address, &attempt_entry::address>>,
|
||||
mi::hashed_non_unique<mi::tag<subnetwork_tag>,
|
||||
mi::member<attempt_entry, boost::asio::ip::address, &attempt_entry::subnetwork>>,
|
||||
mi::ordered_non_unique<mi::tag<last_attempt_tag>,
|
||||
mi::member<attempt_entry, std::chrono::steady_clock::time_point, &attempt_entry::last_attempt>>>>
|
||||
attempts;
|
||||
// clang-format on
|
||||
|
||||
private:
|
||||
std::atomic<bool> stopped{ false };
|
||||
nano::condition_variable condition;
|
||||
mutable nano::mutex mutex;
|
||||
|
||||
mutable nano::random_generator rng;
|
||||
};
|
||||
}
|
||||
|
|
@ -2,7 +2,6 @@
|
|||
#include <nano/lib/interval.hpp>
|
||||
#include <nano/node/messages.hpp>
|
||||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/transport/tcp.hpp>
|
||||
#include <nano/node/transport/tcp_listener.hpp>
|
||||
#include <nano/node/transport/tcp_server.hpp>
|
||||
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@
|
|||
#include <nano/node/messages.hpp>
|
||||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/transport/message_deserializer.hpp>
|
||||
#include <nano/node/transport/tcp.hpp>
|
||||
#include <nano/node/transport/tcp_listener.hpp>
|
||||
#include <nano/node/transport/tcp_server.hpp>
|
||||
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
#include <nano/node/common.hpp>
|
||||
#include <nano/node/messages.hpp>
|
||||
#include <nano/node/transport/fwd.hpp>
|
||||
#include <nano/node/transport/socket.hpp>
|
||||
|
||||
#include <atomic>
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
#include <nano/lib/threading.hpp>
|
||||
#include <nano/node/ipc/ipc_server.hpp>
|
||||
#include <nano/node/transport/tcp.hpp>
|
||||
#include <nano/rpc/rpc_request_processor.hpp>
|
||||
#include <nano/rpc_test/common.hpp>
|
||||
#include <nano/rpc_test/rpc_context.hpp>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue