Use attempts list for TCP channels (#2581)
* Use attempts list for TCP channels to prevent multiple concurrent connections start to same peer * Erase from attempts list after realtime TCP connection failure (udp fallback function) or success (insert function) * Explicitly close sockets after realtime TCP connection start failure * Use tags for TCP & UDP attempts lists * New test for max attempts * Limit max peers per IP for live & beta networks to 5 * Debug assert if there is limit overflow in tests * And special flag to allow using more connections
This commit is contained in:
parent
b23d7c315a
commit
c42709d868
13 changed files with 79 additions and 17 deletions
|
|
@ -927,6 +927,25 @@ TEST (network, replace_port)
|
|||
node1->stop ();
|
||||
}
|
||||
|
||||
TEST (network, peer_max_tcp_attempts)
|
||||
{
|
||||
nano::system system (1);
|
||||
auto node (system.nodes[0]);
|
||||
// Add nodes that can accept TCP connection, but not node ID handshake
|
||||
nano::node_flags node_flags;
|
||||
node_flags.disable_tcp_realtime = true;
|
||||
for (auto i (0); i < node->network_params.node.max_peers_per_ip; ++i)
|
||||
{
|
||||
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work, node_flags));
|
||||
node2->start ();
|
||||
system.nodes.push_back (node2);
|
||||
// Start TCP attempt
|
||||
node->network.merge_peer (node2->network.endpoint ());
|
||||
}
|
||||
ASSERT_EQ (0, node->network.size ());
|
||||
ASSERT_TRUE (node->network.tcp_channels.reachout (nano::endpoint (node->network.endpoint ().address (), nano::get_available_port ())));
|
||||
}
|
||||
|
||||
// The test must be completed in less than 1 second
|
||||
TEST (bandwidth_limiter, validate)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@
|
|||
#include <numeric>
|
||||
|
||||
nano::network::network (nano::node & node_a, uint16_t port_a) :
|
||||
syn_cookies (node_a.network_params.node.max_peers_per_ip),
|
||||
buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer
|
||||
resolver (node_a.io_ctx),
|
||||
limiter (node_a.config.bandwidth_limit),
|
||||
|
|
@ -815,6 +816,11 @@ void nano::message_buffer_manager::stop ()
|
|||
condition.notify_all ();
|
||||
}
|
||||
|
||||
nano::syn_cookies::syn_cookies (size_t max_cookies_per_ip_a) :
|
||||
max_cookies_per_ip (max_cookies_per_ip_a)
|
||||
{
|
||||
}
|
||||
|
||||
boost::optional<nano::uint256_union> nano::syn_cookies::assign (nano::endpoint const & endpoint_a)
|
||||
{
|
||||
auto ip_addr (endpoint_a.address ());
|
||||
|
|
@ -822,7 +828,7 @@ boost::optional<nano::uint256_union> nano::syn_cookies::assign (nano::endpoint c
|
|||
nano::lock_guard<std::mutex> lock (syn_cookie_mutex);
|
||||
unsigned & ip_cookies = cookies_per_ip[ip_addr];
|
||||
boost::optional<nano::uint256_union> result;
|
||||
if (ip_cookies < nano::transport::max_peers_per_ip)
|
||||
if (ip_cookies < max_cookies_per_ip)
|
||||
{
|
||||
if (cookies.find (endpoint_a) == cookies.end ())
|
||||
{
|
||||
|
|
|
|||
|
|
@ -71,6 +71,7 @@ private:
|
|||
class syn_cookies final
|
||||
{
|
||||
public:
|
||||
syn_cookies (size_t);
|
||||
void purge (std::chrono::steady_clock::time_point const &);
|
||||
// Returns boost::none if the IP is rate capped on syn cookie requests,
|
||||
// or if the endpoint already has a syn cookie query
|
||||
|
|
@ -90,6 +91,7 @@ private:
|
|||
mutable std::mutex syn_cookie_mutex;
|
||||
std::unordered_map<nano::endpoint, syn_cookie_info> cookies;
|
||||
std::unordered_map<boost::asio::ip::address, unsigned> cookies_per_ip;
|
||||
size_t max_cookies_per_ip;
|
||||
};
|
||||
class network final
|
||||
{
|
||||
|
|
|
|||
|
|
@ -132,6 +132,7 @@ public:
|
|||
bool disable_block_processor_republishing{ false };
|
||||
bool disable_ongoing_telemetry_requests{ false };
|
||||
bool allow_bootstrap_peers_duplicates{ false };
|
||||
bool disable_max_peers_per_ip{ false }; // For testing only
|
||||
bool fast_bootstrap{ false };
|
||||
bool read_only{ false };
|
||||
nano::confirmation_height_mode confirmation_height_processor_mode{ nano::confirmation_height_mode::automatic };
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ std::shared_ptr<nano::node> nano::system::add_node (nano::node_config const & no
|
|||
nodes.push_back (node);
|
||||
if (nodes.size () > 1)
|
||||
{
|
||||
debug_assert (nodes.size () - 1 <= node->network_params.node.max_peers_per_ip); // Check that we don't start more nodes than limit for single IP address
|
||||
auto begin = nodes.end () - 2;
|
||||
for (auto i (begin), j (begin + 1), n (nodes.end ()); j != n; ++i, ++j)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -116,6 +116,7 @@ bool nano::transport::tcp_channels::insert (std::shared_ptr<nano::transport::cha
|
|||
channels.get<node_id_tag> ().erase (node_id);
|
||||
}
|
||||
channels.get<endpoint_tag> ().emplace (channel_a, socket_a, bootstrap_server_a);
|
||||
attempts.get<endpoint_tag> ().erase (endpoint);
|
||||
error = false;
|
||||
lock.unlock ();
|
||||
node.network.channel_observer (channel_a);
|
||||
|
|
@ -349,8 +350,16 @@ void nano::transport::tcp_channels::stop ()
|
|||
|
||||
bool nano::transport::tcp_channels::max_ip_connections (nano::tcp_endpoint const & endpoint_a)
|
||||
{
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
bool result (channels.get<ip_address_tag> ().count (endpoint_a.address ()) >= nano::transport::max_peers_per_ip);
|
||||
bool result (false);
|
||||
if (!node.flags.disable_max_peers_per_ip)
|
||||
{
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
result = channels.get<ip_address_tag> ().count (endpoint_a.address ()) >= node.network_params.node.max_peers_per_ip;
|
||||
if (!result)
|
||||
{
|
||||
result = attempts.get<ip_address_tag> ().count (endpoint_a.address ()) >= node.network_params.node.max_peers_per_ip;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
@ -396,8 +405,8 @@ void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point
|
|||
auto disconnect_cutoff (channels.get<last_packet_sent_tag> ().lower_bound (cutoff_a));
|
||||
channels.get<last_packet_sent_tag> ().erase (channels.get<last_packet_sent_tag> ().begin (), disconnect_cutoff);
|
||||
// Remove keepalive attempt tracking for attempts older than cutoff
|
||||
auto attempts_cutoff (attempts.get<1> ().lower_bound (cutoff_a));
|
||||
attempts.get<1> ().erase (attempts.get<1> ().begin (), attempts_cutoff);
|
||||
auto attempts_cutoff (attempts.get<last_attempt_tag> ().lower_bound (cutoff_a));
|
||||
attempts.get<last_attempt_tag> ().erase (attempts.get<last_attempt_tag> ().begin (), attempts_cutoff);
|
||||
|
||||
// Cleanup any sockets which may still be existing from failed node id handshakes
|
||||
node_id_handshake_sockets.erase (std::remove_if (node_id_handshake_sockets.begin (), node_id_handshake_sockets.end (), [this](auto socket) {
|
||||
|
|
@ -545,6 +554,7 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a
|
|||
if (auto socket_l = channel->socket.lock ())
|
||||
{
|
||||
node_l->network.tcp_channels.remove_node_id_handshake_socket (socket_l);
|
||||
socket_l->close ();
|
||||
}
|
||||
if (node_l->config.logging.network_node_id_handshake_logging ())
|
||||
{
|
||||
|
|
@ -576,6 +586,7 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
|
|||
if (auto socket_l = socket_w.lock ())
|
||||
{
|
||||
node_l->network.tcp_channels.remove_node_id_handshake_socket (socket_l);
|
||||
socket_l->close ();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
@ -677,6 +688,10 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
|
|||
|
||||
void nano::transport::tcp_channels::udp_fallback (nano::endpoint const & endpoint_a, std::function<void(std::shared_ptr<nano::transport::channel>)> const & callback_a)
|
||||
{
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
attempts.get<endpoint_tag> ().erase (nano::transport::map_endpoint_to_tcp (endpoint_a));
|
||||
}
|
||||
if (callback_a && !node.flags.disable_udp)
|
||||
{
|
||||
auto channel_udp (node.network.udp_channels.create (endpoint_a));
|
||||
|
|
|
|||
|
|
@ -131,6 +131,9 @@ namespace transport
|
|||
class last_bootstrap_attempt_tag
|
||||
{
|
||||
};
|
||||
class last_attempt_tag
|
||||
{
|
||||
};
|
||||
class node_id_tag
|
||||
{
|
||||
};
|
||||
|
|
@ -171,10 +174,12 @@ namespace transport
|
|||
{
|
||||
public:
|
||||
nano::tcp_endpoint endpoint;
|
||||
boost::asio::ip::address address;
|
||||
std::chrono::steady_clock::time_point last_attempt{ std::chrono::steady_clock::now () };
|
||||
|
||||
explicit tcp_endpoint_attempt (nano::tcp_endpoint const & endpoint_a) :
|
||||
endpoint (endpoint_a)
|
||||
endpoint (endpoint_a),
|
||||
address (endpoint_a.address ())
|
||||
{
|
||||
}
|
||||
};
|
||||
|
|
@ -196,9 +201,11 @@ namespace transport
|
|||
channels;
|
||||
boost::multi_index_container<tcp_endpoint_attempt,
|
||||
mi::indexed_by<
|
||||
mi::hashed_unique<
|
||||
mi::hashed_unique<mi::tag<endpoint_tag>,
|
||||
mi::member<tcp_endpoint_attempt, nano::tcp_endpoint, &tcp_endpoint_attempt::endpoint>>,
|
||||
mi::ordered_non_unique<
|
||||
mi::hashed_non_unique<mi::tag<ip_address_tag>,
|
||||
mi::member<tcp_endpoint_attempt, boost::asio::ip::address, &tcp_endpoint_attempt::address>>,
|
||||
mi::ordered_non_unique<mi::tag<last_attempt_tag>,
|
||||
mi::member<tcp_endpoint_attempt, std::chrono::steady_clock::time_point, &tcp_endpoint_attempt::last_attempt>>>>
|
||||
attempts;
|
||||
// clang-format on
|
||||
|
|
|
|||
|
|
@ -43,8 +43,6 @@ namespace transport
|
|||
nano::tcp_endpoint map_endpoint_to_tcp (nano::endpoint const &);
|
||||
// Unassigned, reserved, self
|
||||
bool reserved_address (nano::endpoint const &, bool = false);
|
||||
// Maximum number of peers per IP
|
||||
static size_t constexpr max_peers_per_ip = 10;
|
||||
static std::chrono::seconds constexpr syn_cookie_cutoff = std::chrono::seconds (5);
|
||||
enum class transport_type : uint8_t
|
||||
{
|
||||
|
|
|
|||
|
|
@ -116,6 +116,7 @@ std::shared_ptr<nano::transport::channel_udp> nano::transport::udp_channels::ins
|
|||
{
|
||||
result = std::make_shared<nano::transport::channel_udp> (*this, endpoint_a, network_version_a);
|
||||
channels.get<endpoint_tag> ().insert (result);
|
||||
attempts.get<endpoint_tag> ().erase (endpoint_a);
|
||||
lock.unlock ();
|
||||
node.network.channel_observer (result);
|
||||
}
|
||||
|
|
@ -633,8 +634,12 @@ std::shared_ptr<nano::transport::channel> nano::transport::udp_channels::create
|
|||
|
||||
bool nano::transport::udp_channels::max_ip_connections (nano::endpoint const & endpoint_a)
|
||||
{
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
bool result (channels.get<ip_address_tag> ().count (endpoint_a.address ()) >= nano::transport::max_peers_per_ip);
|
||||
bool result (false);
|
||||
if (!node.flags.disable_max_peers_per_ip)
|
||||
{
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
result = channels.get<ip_address_tag> ().count (endpoint_a.address ()) >= node.network_params.node.max_peers_per_ip;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
@ -677,8 +682,8 @@ void nano::transport::udp_channels::purge (std::chrono::steady_clock::time_point
|
|||
auto disconnect_cutoff (channels.get<last_packet_received_tag> ().lower_bound (cutoff_a));
|
||||
channels.get<last_packet_received_tag> ().erase (channels.get<last_packet_received_tag> ().begin (), disconnect_cutoff);
|
||||
// Remove keepalive attempt tracking for attempts older than cutoff
|
||||
auto attempts_cutoff (attempts.get<1> ().lower_bound (cutoff_a));
|
||||
attempts.get<1> ().erase (attempts.get<1> ().begin (), attempts_cutoff);
|
||||
auto attempts_cutoff (attempts.get<last_attempt_tag> ().lower_bound (cutoff_a));
|
||||
attempts.get<last_attempt_tag> ().erase (attempts.get<last_attempt_tag> ().begin (), attempts_cutoff);
|
||||
}
|
||||
|
||||
void nano::transport::udp_channels::ongoing_keepalive ()
|
||||
|
|
|
|||
|
|
@ -124,6 +124,9 @@ namespace transport
|
|||
class last_bootstrap_attempt_tag
|
||||
{
|
||||
};
|
||||
class last_attempt_tag
|
||||
{
|
||||
};
|
||||
class node_id_tag
|
||||
{
|
||||
};
|
||||
|
|
@ -191,9 +194,9 @@ namespace transport
|
|||
boost::multi_index_container<
|
||||
endpoint_attempt,
|
||||
mi::indexed_by<
|
||||
mi::hashed_unique<
|
||||
mi::hashed_unique<mi::tag<endpoint_tag>,
|
||||
mi::member<endpoint_attempt, nano::endpoint, &endpoint_attempt::endpoint>>,
|
||||
mi::ordered_non_unique<
|
||||
mi::ordered_non_unique<mi::tag<last_attempt_tag>,
|
||||
mi::member<endpoint_attempt, std::chrono::steady_clock::time_point, &endpoint_attempt::last_attempt>>>>
|
||||
attempts;
|
||||
// clang-format on
|
||||
|
|
|
|||
|
|
@ -135,6 +135,7 @@ nano::node_constants::node_constants (nano::network_constants & network_constant
|
|||
peer_interval = search_pending_interval;
|
||||
unchecked_cleaning_interval = std::chrono::minutes (30);
|
||||
process_confirmed_interval = network_constants.is_test_network () ? std::chrono::milliseconds (50) : std::chrono::milliseconds (500);
|
||||
max_peers_per_ip = network_constants.is_test_network () ? 10 : 5;
|
||||
max_weight_samples = network_constants.is_live_network () ? 4032 : 864;
|
||||
weight_period = 5 * 60; // 5 minutes
|
||||
}
|
||||
|
|
|
|||
|
|
@ -419,6 +419,8 @@ public:
|
|||
std::chrono::seconds peer_interval;
|
||||
std::chrono::minutes unchecked_cleaning_interval;
|
||||
std::chrono::milliseconds process_confirmed_interval;
|
||||
/** Maximum number of peers per IP */
|
||||
size_t max_peers_per_ip;
|
||||
|
||||
/** The maximum amount of samples for a 2 week period on live or 3 days on beta */
|
||||
uint64_t max_weight_samples;
|
||||
|
|
|
|||
|
|
@ -187,7 +187,9 @@ TEST (store, load)
|
|||
// ulimit -n increasing may be required
|
||||
TEST (node, fork_storm)
|
||||
{
|
||||
nano::system system (64);
|
||||
nano::node_flags flags;
|
||||
flags.disable_max_peers_per_ip = true;
|
||||
nano::system system (64, nano::transport::transport_type::tcp, flags);
|
||||
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
|
||||
auto previous (system.nodes[0]->latest (nano::test_genesis_key.pub));
|
||||
auto balance (system.nodes[0]->balance (nano::test_genesis_key.pub));
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue