Merge pull request #3564 from nanocurrency/network_limiting

Improve and simplify a number of network limiters
This commit is contained in:
clemahieu 2021-11-22 13:48:44 +00:00 committed by GitHub
commit 941331bdb3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 595 additions and 54 deletions

View file

@ -0,0 +1,7 @@
#pragma once
#include <nano/boost/private/macro_warnings.hpp>
DISABLE_ASIO_WARNINGS
#include <boost/asio/ip/network_v6.hpp>
REENABLE_WARNINGS

View file

@ -1,3 +1,4 @@
#include <nano/node/nodeconfig.hpp>
#include <nano/node/transport/udp.hpp>
#include <nano/test_common/network.hpp>
#include <nano/test_common/system.hpp>
@ -918,7 +919,10 @@ namespace transport
{
TEST (network, peer_max_tcp_attempts_subnetwork)
{
nano::system system (1);
nano::node_flags node_flags;
node_flags.disable_max_peers_per_ip = true;
nano::system system;
system.add_node (node_flags);
auto node (system.nodes[0]);
for (auto i (0); i < node->network_params.network.max_peers_per_subnetwork; ++i)
{
@ -927,9 +931,9 @@ namespace transport
ASSERT_FALSE (node->network.tcp_channels.reachout (endpoint));
}
ASSERT_EQ (0, node->network.size ());
ASSERT_EQ (0, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::out));
ASSERT_EQ (0, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::out));
ASSERT_TRUE (node->network.tcp_channels.reachout (nano::endpoint (boost::asio::ip::make_address_v6 ("::ffff:127.0.0.1"), nano::get_available_port ())));
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::out));
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::out));
}
}
}

View file

@ -1,3 +1,5 @@
#include <nano/boost/asio/ip/address_v6.hpp>
#include <nano/boost/asio/ip/network_v6.hpp>
#include <nano/lib/threading.hpp>
#include <nano/node/socket.hpp>
#include <nano/test_common/system.hpp>
@ -5,23 +7,22 @@
#include <gtest/gtest.h>
#include <map>
#include <memory>
#include <utility>
#include <vector>
using namespace std::chrono_literals;
TEST (socket, max_connections)
{
// this is here just so that ASSERT_TIMELY can be used
nano::system system;
auto node_flags = nano::inactive_node_flag_defaults ();
node_flags.read_only = false;
nano::inactive_node inactivenode (nano::unique_path (), node_flags);
auto node = inactivenode.node;
auto node = system.add_node ();
nano::thread_runner runner (node->io_ctx, 1);
auto server_port (nano::get_available_port ());
boost::asio::ip::tcp::endpoint listen_endpoint (boost::asio::ip::address_v6::any (), server_port);
boost::asio::ip::tcp::endpoint dst_endpoint (boost::asio::ip::address_v6::loopback (), server_port);
auto server_port = nano::get_available_port ();
boost::asio::ip::tcp::endpoint listen_endpoint{ boost::asio::ip::address_v6::any (), server_port };
boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), server_port };
// start a server socket that allows max 2 live connections
auto server_socket = std::make_shared<nano::server_socket> (*node, listen_endpoint, 2);
@ -37,10 +38,10 @@ TEST (socket, max_connections)
});
// client side connection tracking
std::atomic<int> connection_attempts = 0;
std::atomic<size_t> connection_attempts = 0;
auto connect_handler = [&connection_attempts] (boost::system::error_code const & ec_a) {
ASSERT_EQ (ec_a.value (), 0);
connection_attempts++;
++connection_attempts;
};
// start 3 clients, 2 will persist but 1 will be dropped
@ -102,8 +103,280 @@ TEST (socket, max_connections)
ASSERT_TIMELY (5s, server_sockets.size () == 5); // connections accepted by the server
node->stop ();
runner.stop_event_processing ();
runner.join ();
}
TEST (socket, max_connections_per_ip)
{
nano::system system;
auto node = system.add_node ();
ASSERT_FALSE (node->flags.disable_max_peers_per_ip);
auto server_port = nano::get_available_port ();
boost::asio::ip::tcp::endpoint listen_endpoint{ boost::asio::ip::address_v6::any (), server_port };
boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), server_port };
const auto max_ip_connections = node->network_params.network.max_peers_per_ip;
ASSERT_TRUE (max_ip_connections >= 1);
const auto max_global_connections = 1000;
auto server_socket = std::make_shared<nano::server_socket> (*node, listen_endpoint, max_global_connections);
boost::system::error_code ec;
server_socket->start (ec);
ASSERT_FALSE (ec);
// successful incoming connections are stored in server_sockets to keep them alive (server side)
std::vector<std::shared_ptr<nano::socket>> server_sockets;
server_socket->on_connection ([&server_sockets] (std::shared_ptr<nano::socket> const & new_connection, boost::system::error_code const & ec_a) {
server_sockets.push_back (new_connection);
return true;
});
// client side connection tracking
std::atomic<size_t> connection_attempts = 0;
auto connect_handler = [&connection_attempts] (boost::system::error_code const & ec_a) {
ASSERT_EQ (ec_a.value (), 0);
++connection_attempts;
};
// start n clients, n-1 will persist but 1 will be dropped, where n == max_ip_connections
std::vector<std::shared_ptr<nano::socket>> client_list;
client_list.reserve (max_ip_connections + 1);
for (auto idx = 0; idx < max_ip_connections + 1; ++idx)
{
auto client = std::make_shared<nano::socket> (*node);
client->async_connect (dst_endpoint, connect_handler);
client_list.push_back (client);
}
auto get_tcp_max_per_ip = [&node] () {
return node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::in);
};
auto get_tcp_accept_successes = [&node] () {
return node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
};
ASSERT_TIMELY (5s, get_tcp_accept_successes () == max_ip_connections);
ASSERT_TIMELY (5s, get_tcp_max_per_ip () == 1);
ASSERT_TIMELY (5s, connection_attempts == max_ip_connections + 1);
node->stop ();
}
TEST (socket, limited_subnet_address)
{
auto address = boost::asio::ip::make_address ("a41d:b7b2:8298:cf45:672e:bd1a:e7fb:f713");
auto network = nano::socket_functions::get_ipv6_subnet_address (address.to_v6 (), 32); // network prefix = 32.
ASSERT_EQ ("a41d:b7b2:8298:cf45:672e:bd1a:e7fb:f713/32", network.to_string ());
ASSERT_EQ ("a41d:b7b2::/32", network.canonical ().to_string ());
}
TEST (socket, first_ipv6_subnet_address)
{
auto address = boost::asio::ip::make_address ("a41d:b7b2:8298:cf45:672e:bd1a:e7fb:f713");
auto first_address = nano::socket_functions::first_ipv6_subnet_address (address.to_v6 (), 32); // network prefix = 32.
ASSERT_EQ ("a41d:b7b2::", first_address.to_string ());
}
TEST (socket, last_ipv6_subnet_address)
{
auto address = boost::asio::ip::make_address ("a41d:b7b2:8298:cf45:672e:bd1a:e7fb:f713");
auto last_address = nano::socket_functions::last_ipv6_subnet_address (address.to_v6 (), 32); // network prefix = 32.
ASSERT_EQ ("a41d:b7b2:ffff:ffff:ffff:ffff:ffff:ffff", last_address.to_string ());
}
TEST (socket, count_subnetwork_connections)
{
nano::system system;
auto node = system.add_node ();
auto address0 = boost::asio::ip::make_address ("a41d:b7b1:ffff:ffff:ffff:ffff:ffff:ffff"); // out of network prefix
auto address1 = boost::asio::ip::make_address ("a41d:b7b2:8298:cf45:672e:bd1a:e7fb:f713"); // referece address
auto address2 = boost::asio::ip::make_address ("a41d:b7b2::"); // start of the network range
auto address3 = boost::asio::ip::make_address ("a41d:b7b2::1");
auto address4 = boost::asio::ip::make_address ("a41d:b7b2:ffff:ffff:ffff:ffff:ffff:ffff"); // end of the network range
auto address5 = boost::asio::ip::make_address ("a41d:b7b3::"); // out of the network prefix
auto address6 = boost::asio::ip::make_address ("a41d:b7b3::1"); // out of the network prefix
auto connection0 = std::make_shared<nano::socket> (*node);
auto connection1 = std::make_shared<nano::socket> (*node);
auto connection2 = std::make_shared<nano::socket> (*node);
auto connection3 = std::make_shared<nano::socket> (*node);
auto connection4 = std::make_shared<nano::socket> (*node);
auto connection5 = std::make_shared<nano::socket> (*node);
auto connection6 = std::make_shared<nano::socket> (*node);
nano::address_socket_mmap connections_per_address;
connections_per_address.emplace (address0, connection0);
connections_per_address.emplace (address1, connection1);
connections_per_address.emplace (address2, connection2);
connections_per_address.emplace (address3, connection3);
connections_per_address.emplace (address4, connection4);
connections_per_address.emplace (address5, connection5);
connections_per_address.emplace (address6, connection6);
// Asserts it counts only the connections for the specified address and its network prefix.
ASSERT_EQ (4, nano::socket_functions::count_subnetwork_connections (connections_per_address, address1.to_v6 (), 32));
}
TEST (socket, max_connections_per_subnetwork)
{
nano::system system;
nano::node_flags node_flags;
// disabling IP limit because it will be used the same IP address to check they come from the same subnetwork.
node_flags.disable_max_peers_per_ip = true;
node_flags.disable_max_peers_per_subnetwork = false;
auto node = system.add_node (node_flags);
ASSERT_TRUE (node->flags.disable_max_peers_per_ip);
ASSERT_FALSE (node->flags.disable_max_peers_per_subnetwork);
auto server_port = nano::get_available_port ();
boost::asio::ip::tcp::endpoint listen_endpoint{ boost::asio::ip::address_v6::any (), server_port };
boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), server_port };
const auto max_subnetwork_connections = node->network_params.network.max_peers_per_subnetwork;
ASSERT_TRUE (max_subnetwork_connections >= 1);
const auto max_global_connections = 1000;
auto server_socket = std::make_shared<nano::server_socket> (*node, listen_endpoint, max_global_connections);
boost::system::error_code ec;
server_socket->start (ec);
ASSERT_FALSE (ec);
// successful incoming connections are stored in server_sockets to keep them alive (server side)
std::vector<std::shared_ptr<nano::socket>> server_sockets;
server_socket->on_connection ([&server_sockets] (std::shared_ptr<nano::socket> const & new_connection, boost::system::error_code const & ec_a) {
server_sockets.push_back (new_connection);
return true;
});
// client side connection tracking
std::atomic<size_t> connection_attempts = 0;
auto connect_handler = [&connection_attempts] (boost::system::error_code const & ec_a) {
ASSERT_EQ (ec_a.value (), 0);
++connection_attempts;
};
// start n clients, n-1 will persist but 1 will be dropped, where n == max_subnetwork_connections
std::vector<std::shared_ptr<nano::socket>> client_list;
client_list.reserve (max_subnetwork_connections + 1);
for (auto idx = 0; idx < max_subnetwork_connections + 1; ++idx)
{
auto client = std::make_shared<nano::socket> (*node);
client->async_connect (dst_endpoint, connect_handler);
client_list.push_back (client);
}
auto get_tcp_max_per_subnetwork = [&node] () {
return node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::in);
};
auto get_tcp_accept_successes = [&node] () {
return node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
};
ASSERT_TIMELY (5s, get_tcp_accept_successes () == max_subnetwork_connections);
ASSERT_TIMELY (5s, get_tcp_max_per_subnetwork () == 1);
ASSERT_TIMELY (5s, connection_attempts == max_subnetwork_connections + 1);
node->stop ();
}
TEST (socket, disabled_max_peers_per_ip)
{
nano::system system;
nano::node_flags node_flags;
node_flags.disable_max_peers_per_ip = true;
auto node = system.add_node (node_flags);
ASSERT_TRUE (node->flags.disable_max_peers_per_ip);
auto server_port = nano::get_available_port ();
boost::asio::ip::tcp::endpoint listen_endpoint{ boost::asio::ip::address_v6::any (), server_port };
boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), server_port };
const auto max_ip_connections = node->network_params.network.max_peers_per_ip;
ASSERT_TRUE (max_ip_connections >= 1);
const auto max_global_connections = 1000;
auto server_socket = std::make_shared<nano::server_socket> (*node, listen_endpoint, max_global_connections);
boost::system::error_code ec;
server_socket->start (ec);
ASSERT_FALSE (ec);
// successful incoming connections are stored in server_sockets to keep them alive (server side)
std::vector<std::shared_ptr<nano::socket>> server_sockets;
server_socket->on_connection ([&server_sockets] (std::shared_ptr<nano::socket> const & new_connection, boost::system::error_code const & ec_a) {
server_sockets.push_back (new_connection);
return true;
});
// client side connection tracking
std::atomic<size_t> connection_attempts = 0;
auto connect_handler = [&connection_attempts] (boost::system::error_code const & ec_a) {
ASSERT_EQ (ec_a.value (), 0);
++connection_attempts;
};
// start n clients, n-1 will persist but 1 will be dropped, where n == max_ip_connections
std::vector<std::shared_ptr<nano::socket>> client_list;
client_list.reserve (max_ip_connections + 1);
for (auto idx = 0; idx < max_ip_connections + 1; ++idx)
{
auto client = std::make_shared<nano::socket> (*node);
client->async_connect (dst_endpoint, connect_handler);
client_list.push_back (client);
}
auto get_tcp_max_per_ip = [&node] () {
return node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::in);
};
auto get_tcp_accept_successes = [&node] () {
return node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
};
ASSERT_TIMELY (5s, get_tcp_accept_successes () == max_ip_connections + 1);
ASSERT_TIMELY (5s, get_tcp_max_per_ip () == 0);
ASSERT_TIMELY (5s, connection_attempts == max_ip_connections + 1);
node->stop ();
}
TEST (socket, disconnection_of_silent_connections)
{
nano::system system;
auto node = system.add_node ();
auto socket = std::make_shared<nano::socket> (*node);
// Classify the socket type as real-time as the disconnections are done only for this connection type.
socket->type_set (nano::socket::type_t::realtime);
// Silent connections are connections open by external peers that don't contribute with any data.
socket->set_silent_connection_tolerance_time (std::chrono::seconds{ 5 });
auto bootstrap_endpoint = node->bootstrap.endpoint ();
std::atomic<bool> connected{ false };
// Opening a connection that will be closed because it remains silent during the tolerance time.
socket->async_connect (bootstrap_endpoint, [socket, &connected] (boost::system::error_code const & ec) {
ASSERT_FALSE (ec);
connected = true;
});
ASSERT_TIMELY (4s, connected);
// Checking the connection was closed.
ASSERT_TIMELY (10s, socket->is_closed ());
auto get_tcp_silent_connection_drops = [&node] () {
return node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_silent_connection_drop, nano::stat::dir::in);
};
ASSERT_EQ (1, get_tcp_silent_connection_drops ());
node->stop ();
}
TEST (socket, drop_policy)
@ -173,6 +446,8 @@ TEST (socket, concurrent_writes)
{
auto node_flags = nano::inactive_node_flag_defaults ();
node_flags.read_only = false;
node_flags.disable_max_peers_per_ip = true;
node_flags.disable_max_peers_per_subnetwork = true;
nano::inactive_node inactivenode (nano::unique_path (), node_flags);
auto node = inactivenode.node;

View file

@ -156,10 +156,12 @@ public:
request_interval_ms = is_dev_network () ? 20 : 500;
cleanup_period = is_dev_network () ? std::chrono::seconds (1) : std::chrono::seconds (60);
idle_timeout = is_dev_network () ? cleanup_period * 15 : cleanup_period * 2;
silent_connection_tolerance_time = std::chrono::seconds (120);
syn_cookie_cutoff = std::chrono::seconds (5);
bootstrap_interval = std::chrono::seconds (15 * 60);
max_peers_per_ip = is_dev_network () ? 10 : 5;
max_peers_per_subnetwork = max_peers_per_ip * 4;
ipv6_subnetwork_prefix_for_limiting = 64; // Equivalent to network prefix /64.
peer_dump_interval = is_dev_network () ? std::chrono::seconds (1) : std::chrono::seconds (5 * 60);
}
@ -188,12 +190,14 @@ public:
}
/** Default maximum idle time for a socket before it's automatically closed */
std::chrono::seconds idle_timeout;
std::chrono::seconds silent_connection_tolerance_time;
std::chrono::seconds syn_cookie_cutoff;
std::chrono::seconds bootstrap_interval;
/** Maximum number of peers per IP */
/** Maximum number of peers per IP. It is also the max number of connections per IP */
size_t max_peers_per_ip;
/** Maximum number of peers per subnetwork */
size_t max_peers_per_subnetwork;
size_t ipv6_subnetwork_prefix_for_limiting;
std::chrono::seconds peer_dump_interval;
/** Returns the network this object contains values for */

View file

@ -775,6 +775,15 @@ std::string nano::stat::detail_to_string (uint32_t key)
case nano::stat::detail::tcp_max_per_ip:
res = "tcp_max_per_ip";
break;
case nano::stat::detail::tcp_max_per_subnetwork:
res = "tcp_max_per_subnetwork";
break;
case nano::stat::detail::tcp_silent_connection_drop:
res = "tcp_silent_connection_drop";
break;
case nano::stat::detail::tcp_io_timeout_drop:
res = "tcp_io_timeout_drop";
break;
case nano::stat::detail::unreachable_host:
res = "unreachable_host";
break;
@ -808,6 +817,12 @@ std::string nano::stat::detail_to_string (uint32_t key)
case nano::stat::detail::outdated_version:
res = "outdated_version";
break;
case nano::stat::detail::udp_max_per_ip:
res = "udp_max_per_ip";
break;
case nano::stat::detail::udp_max_per_subnetwork:
res = "udp_max_per_subnetwork";
break;
case nano::stat::detail::blocks_confirmed:
res = "blocks_confirmed";
break;

View file

@ -337,6 +337,8 @@ public:
invalid_telemetry_req_message,
invalid_telemetry_ack_message,
outdated_version,
udp_max_per_ip,
udp_max_per_subnetwork,
// tcp
tcp_accept_success,
@ -345,6 +347,9 @@ public:
tcp_write_no_socket_drop,
tcp_excluded,
tcp_max_per_ip,
tcp_max_per_subnetwork,
tcp_silent_connection_drop,
tcp_io_timeout_drop,
// ipc
invocations,

View file

@ -749,5 +749,5 @@ bool nano::bootstrap_server::is_bootstrap_connection ()
bool nano::bootstrap_server::is_realtime_connection ()
{
return socket->type () == nano::socket::type_t::realtime || socket->type () == nano::socket::type_t::realtime_response_server;
return socket->is_realtime_connection ();
}

View file

@ -144,6 +144,7 @@ public:
bool disable_block_processor_republishing{ false };
bool allow_bootstrap_peers_duplicates{ false };
bool disable_max_peers_per_ip{ false }; // For testing only
bool disable_max_peers_per_subnetwork{ false }; // For testing only
bool force_use_write_database_queue{ false }; // For testing only. RocksDB does not use the database queue, but some tests rely on it being used.
bool disable_search_pending{ false }; // For testing only
bool enable_pruning{ false };

View file

@ -1,12 +1,19 @@
#include <nano/boost/asio/bind_executor.hpp>
#include <nano/boost/asio/dispatch.hpp>
#include <nano/boost/asio/ip/address.hpp>
#include <nano/boost/asio/ip/address_v6.hpp>
#include <nano/boost/asio/ip/network_v6.hpp>
#include <nano/boost/asio/read.hpp>
#include <nano/node/node.hpp>
#include <nano/node/socket.hpp>
#include <nano/node/transport/transport.hpp>
#include <boost/format.hpp>
#include <cstdint>
#include <iterator>
#include <limits>
#include <memory>
nano::socket::socket (nano::node & node_a) :
strand{ node_a.io_ctx.get_executor () },
@ -14,7 +21,9 @@ nano::socket::socket (nano::node & node_a) :
node{ node_a },
next_deadline{ std::numeric_limits<uint64_t>::max () },
last_completion_time{ 0 },
io_timeout{ node_a.config.tcp_io_timeout }
last_receive_time{ 0 },
io_timeout{ node_a.config.tcp_io_timeout },
silent_connection_tolerance_time{ node_a.network_params.network.silent_connection_tolerance_time }
{
}
@ -51,6 +60,7 @@ void nano::socket::async_read (std::shared_ptr<std::vector<uint8_t>> const & buf
[this_l, buffer_a, callback_a] (boost::system::error_code const & ec, std::size_t size_a) {
this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::in, size_a);
this_l->stop_timer ();
this_l->update_last_receive_time ();
callback_a (ec, size_a);
}));
}));
@ -117,6 +127,11 @@ void nano::socket::stop_timer ()
last_completion_time = nano::seconds_since_epoch ();
}
void nano::socket::update_last_receive_time ()
{
last_receive_time = nano::seconds_since_epoch ();
}
void nano::socket::checkup ()
{
std::weak_ptr<nano::socket> this_w (shared_from_this ());
@ -124,7 +139,18 @@ void nano::socket::checkup ()
if (auto this_l = this_w.lock ())
{
uint64_t now (nano::seconds_since_epoch ());
auto condition_to_disconnect{ false };
if (this_l->is_realtime_connection () && now - this_l->last_receive_time > this_l->silent_connection_tolerance_time.count ())
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_silent_connection_drop, nano::stat::dir::in);
condition_to_disconnect = true;
}
if (this_l->next_deadline != std::numeric_limits<uint64_t>::max () && now - this_l->last_completion_time > this_l->next_deadline)
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::in);
condition_to_disconnect = true;
}
if (condition_to_disconnect)
{
if (this_l->node.config.logging.network_timeout_logging ())
{
@ -157,6 +183,14 @@ void nano::socket::timeout_set (std::chrono::seconds io_timeout_a)
io_timeout = io_timeout_a;
}
void nano::socket::set_silent_connection_tolerance_time (std::chrono::seconds tolerance_time_a)
{
auto this_l (shared_from_this ());
boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l, tolerance_time_a] () {
this_l->silent_connection_tolerance_time = tolerance_time_a;
}));
}
void nano::socket::close ()
{
auto this_l (shared_from_this ());
@ -221,17 +255,80 @@ void nano::server_socket::close ()
boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l] () {
this_l->close_internal ();
this_l->acceptor.close ();
for (auto & connection_w : this_l->connections)
for (auto & address_connection_pair : this_l->connections_per_address)
{
if (auto connection_l = connection_w.lock ())
if (auto connection_l = address_connection_pair.second.lock ())
{
connection_l->close ();
}
}
this_l->connections.clear ();
this_l->connections_per_address.clear ();
}));
}
boost::asio::ip::network_v6 nano::socket_functions::get_ipv6_subnet_address (boost::asio::ip::address_v6 const & ip_address, size_t network_prefix)
{
return boost::asio::ip::make_network_v6 (ip_address, network_prefix);
}
boost::asio::ip::address nano::socket_functions::first_ipv6_subnet_address (boost::asio::ip::address_v6 const & ip_address, size_t network_prefix)
{
auto range = get_ipv6_subnet_address (ip_address, network_prefix).hosts ();
debug_assert (!range.empty ());
return *(range.begin ());
}
boost::asio::ip::address nano::socket_functions::last_ipv6_subnet_address (boost::asio::ip::address_v6 const & ip_address, size_t network_prefix)
{
auto range = get_ipv6_subnet_address (ip_address, network_prefix).hosts ();
debug_assert (!range.empty ());
return *(--range.end ());
}
size_t nano::socket_functions::count_subnetwork_connections (
nano::address_socket_mmap const & per_address_connections,
boost::asio::ip::address_v6 const & remote_address,
size_t network_prefix)
{
auto range = get_ipv6_subnet_address (remote_address, network_prefix).hosts ();
if (range.empty ())
{
return 0;
}
auto const first_ip = first_ipv6_subnet_address (remote_address, network_prefix);
auto const last_ip = last_ipv6_subnet_address (remote_address, network_prefix);
auto const counted_connections = std::distance (per_address_connections.lower_bound (first_ip), per_address_connections.upper_bound (last_ip));
return counted_connections;
}
bool nano::server_socket::limit_reached_for_incoming_subnetwork_connections (std::shared_ptr<nano::socket> const & new_connection)
{
debug_assert (strand.running_in_this_thread ());
if (node.flags.disable_max_peers_per_subnetwork)
{
// If the limit is disabled, then it is unreachable.
return false;
}
auto const counted_connections = socket_functions::count_subnetwork_connections (
connections_per_address,
nano::transport::mapped_from_v4_or_v6 (new_connection->remote.address ()),
node.network_params.network.ipv6_subnetwork_prefix_for_limiting);
return counted_connections >= node.network_params.network.max_peers_per_subnetwork;
}
bool nano::server_socket::limit_reached_for_incoming_ip_connections (std::shared_ptr<nano::socket> const & new_connection)
{
debug_assert (strand.running_in_this_thread ());
if (node.flags.disable_max_peers_per_ip)
{
// If the limit is disabled, then it is unreachable.
return false;
}
auto const address_connections_range = connections_per_address.equal_range (new_connection->remote.address ());
auto const counted_connections = std::distance (address_connections_range.first, address_connections_range.second);
return counted_connections >= node.network_params.network.max_peers_per_ip;
}
void nano::server_socket::on_connection (std::function<bool (std::shared_ptr<nano::socket> const &, boost::system::error_code const &)> callback_a)
{
auto this_l (std::static_pointer_cast<nano::server_socket> (shared_from_this ()));
@ -250,14 +347,41 @@ void nano::server_socket::on_connection (std::function<bool (std::shared_ptr<nan
[this_l, new_connection, callback_a] (boost::system::error_code const & ec_a) {
this_l->evict_dead_connections ();
if (this_l->connections.size () >= this_l->max_inbound_connections)
if (this_l->connections_per_address.size () >= this_l->max_inbound_connections)
{
this_l->node.logger.always_log ("Network: max_inbound_connections reached, unable to open new connection");
this_l->node.logger.try_log ("Network: max_inbound_connections reached, unable to open new connection");
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in);
this_l->on_connection_requeue_delayed (callback_a);
return;
}
if (this_l->limit_reached_for_incoming_ip_connections (new_connection))
{
auto const remote_ip_address = new_connection->remote_endpoint ().address ();
auto const log_message = boost::str (
boost::format ("Network: max connections per IP (max_peers_per_ip) was reached for %1%, unable to open new connection")
% remote_ip_address.to_string ());
this_l->node.logger.try_log (log_message);
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::in);
this_l->on_connection_requeue_delayed (callback_a);
return;
}
if (this_l->limit_reached_for_incoming_subnetwork_connections (new_connection))
{
auto const remote_ip_address = new_connection->remote_endpoint ().address ();
debug_assert (remote_ip_address.is_v6 ());
auto const remote_subnet = socket_functions::get_ipv6_subnet_address (remote_ip_address.to_v6 (), this_l->node.network_params.network.max_peers_per_subnetwork);
auto const log_message = boost::str (
boost::format ("Network: max connections per subnetwork (max_peers_per_subnetwork) was reached for subnetwork %1% (remote IP: %2%), unable to open new connection")
% remote_subnet.canonical ().to_string ()
% remote_ip_address.to_string ());
this_l->node.logger.try_log (log_message);
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::in);
this_l->on_connection_requeue_delayed (callback_a);
return;
}
if (!ec_a)
{
// Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start
@ -265,7 +389,7 @@ void nano::server_socket::on_connection (std::function<bool (std::shared_ptr<nan
new_connection->checkup ();
new_connection->start_timer (this_l->node.network_params.network.is_dev_network () ? std::chrono::seconds (2) : this_l->node.network_params.network.idle_timeout);
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
this_l->connections.push_back (new_connection);
this_l->connections_per_address.emplace (new_connection->remote.address (), new_connection);
if (callback_a (new_connection, ec_a))
{
this_l->on_connection (callback_a);
@ -329,5 +453,13 @@ bool nano::server_socket::is_temporary_error (boost::system::error_code const ec
void nano::server_socket::evict_dead_connections ()
{
debug_assert (strand.running_in_this_thread ());
connections.erase (std::remove_if (connections.begin (), connections.end (), [] (auto & connection) { return connection.expired (); }), connections.end ());
for (auto it = connections_per_address.begin (); it != connections_per_address.end ();)
{
if (it->second.expired ())
{
it = connections_per_address.erase (it);
continue;
}
++it;
}
}

View file

@ -8,9 +8,21 @@
#include <chrono>
#include <deque>
#include <map>
#include <memory>
#include <vector>
namespace boost
{
namespace asio
{
namespace ip
{
class network_v6;
}
}
}
namespace nano
{
/** Policy to affect at which stage a buffer can be dropped */
@ -60,6 +72,7 @@ public:
/** This can be called to change the maximum idle time, e.g. based on the type of traffic detected. */
void timeout_set (std::chrono::seconds io_timeout_a);
void start_timer (std::chrono::seconds deadline_a);
void set_silent_connection_tolerance_time (std::chrono::seconds tolerance_time_a);
bool max () const
{
return queue_size >= queue_size_max;
@ -76,6 +89,14 @@ public:
{
type_m = type_a;
}
bool is_realtime_connection ()
{
return type () == nano::socket::type_t::realtime || type () == nano::socket::type_t::realtime_response_server;
}
bool is_closed ()
{
return closed;
}
protected:
/** Holds the buffer and callback for queued writes */
@ -95,8 +116,10 @@ protected:
std::atomic<uint64_t> next_deadline;
std::atomic<uint64_t> last_completion_time;
std::atomic<uint64_t> last_receive_time;
std::atomic<bool> timed_out{ false };
std::atomic<std::chrono::seconds> io_timeout;
std::chrono::seconds silent_connection_tolerance_time;
std::atomic<std::size_t> queue_size{ 0 };
/** Set by close() - completion handlers must check this. This is more reliable than checking
@ -105,6 +128,7 @@ protected:
void close_internal ();
void start_timer ();
void stop_timer ();
void update_last_receive_time ();
void checkup ();
private:
@ -114,6 +138,16 @@ public:
static std::size_t constexpr queue_size_max = 128;
};
using address_socket_mmap = std::multimap<boost::asio::ip::address, std::weak_ptr<socket>>;
namespace socket_functions
{
boost::asio::ip::network_v6 get_ipv6_subnet_address (boost::asio::ip::address_v6 const &, size_t);
boost::asio::ip::address first_ipv6_subnet_address (boost::asio::ip::address_v6 const &, size_t);
boost::asio::ip::address last_ipv6_subnet_address (boost::asio::ip::address_v6 const &, size_t);
size_t count_subnetwork_connections (nano::address_socket_mmap const &, boost::asio::ip::address_v6 const &, size_t);
}
/** Socket class for TCP servers */
class server_socket final : public socket
{
@ -138,12 +172,15 @@ public:
}
private:
std::vector<std::weak_ptr<nano::socket>> connections;
nano::address_socket_mmap connections_per_address;
boost::asio::ip::tcp::acceptor acceptor;
boost::asio::ip::tcp::endpoint local;
std::size_t max_inbound_connections;
void evict_dead_connections ();
bool is_temporary_error (boost::system::error_code const ec_a);
void on_connection_requeue_delayed (std::function<bool (std::shared_ptr<nano::socket> const & new_connection, boost::system::error_code const &)>);
/** Checks whether the maximum number of connections per IP was reached. If so, it returns true. */
bool limit_reached_for_incoming_ip_connections (std::shared_ptr<nano::socket> const & new_connection);
bool limit_reached_for_incoming_subnetwork_connections (std::shared_ptr<nano::socket> const & new_connection);
};
}

View file

@ -362,17 +362,17 @@ void nano::transport::tcp_channels::stop ()
bool nano::transport::tcp_channels::max_ip_connections (nano::tcp_endpoint const & endpoint_a)
{
bool result (false);
if (!node.flags.disable_max_peers_per_ip)
if (node.flags.disable_max_peers_per_ip)
{
auto const address (nano::transport::ipv4_address_or_ipv6_subnet (endpoint_a.address ()));
auto const subnet (nano::transport::map_address_to_subnetwork (endpoint_a.address ()));
nano::unique_lock<nano::mutex> lock (mutex);
result = channels.get<ip_address_tag> ().count (address) >= node.network_params.network.max_peers_per_ip || channels.get<subnetwork_tag> ().count (subnet) >= node.network_params.network.max_peers_per_subnetwork;
if (!result)
{
result = attempts.get<ip_address_tag> ().count (address) >= node.network_params.network.max_peers_per_ip || attempts.get<subnetwork_tag> ().count (subnet) >= node.network_params.network.max_peers_per_subnetwork;
}
return false;
}
bool result{ false };
auto const address (nano::transport::ipv4_address_or_ipv6_subnet (endpoint_a.address ()));
nano::unique_lock<nano::mutex> lock (mutex);
result = channels.get<ip_address_tag> ().count (address) >= node.network_params.network.max_peers_per_ip;
if (!result)
{
result = attempts.get<ip_address_tag> ().count (address) >= node.network_params.network.max_peers_per_ip;
}
if (result)
{
@ -381,11 +381,37 @@ bool nano::transport::tcp_channels::max_ip_connections (nano::tcp_endpoint const
return result;
}
bool nano::transport::tcp_channels::max_subnetwork_connections (nano::tcp_endpoint const & endpoint_a)
{
if (node.flags.disable_max_peers_per_subnetwork)
{
return false;
}
bool result{ false };
auto const subnet (nano::transport::map_address_to_subnetwork (endpoint_a.address ()));
nano::unique_lock<nano::mutex> lock (mutex);
result = channels.get<subnetwork_tag> ().count (subnet) >= node.network_params.network.max_peers_per_subnetwork;
if (!result)
{
result = attempts.get<subnetwork_tag> ().count (subnet) >= node.network_params.network.max_peers_per_subnetwork;
}
if (result)
{
node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::out);
}
return result;
}
bool nano::transport::tcp_channels::max_ip_or_subnetwork_connections (nano::tcp_endpoint const & endpoint_a)
{
return max_ip_connections (endpoint_a) || max_subnetwork_connections (endpoint_a);
}
bool nano::transport::tcp_channels::reachout (nano::endpoint const & endpoint_a)
{
auto tcp_endpoint (nano::transport::map_endpoint_to_tcp (endpoint_a));
// Don't overload single IP
bool error = node.network.excluded_peers.check (tcp_endpoint) || max_ip_connections (tcp_endpoint);
bool error = node.network.excluded_peers.check (tcp_endpoint) || max_ip_or_subnetwork_connections (tcp_endpoint);
if (!error && !node.flags.disable_tcp_realtime)
{
// Don't keepalive to nodes that already sent us something

View file

@ -91,7 +91,9 @@ namespace transport
void stop ();
void process_messages ();
void process_message (nano::message const &, nano::tcp_endpoint const &, nano::account const &, std::shared_ptr<nano::socket> const &);
bool max_ip_connections (nano::tcp_endpoint const &);
bool max_ip_connections (nano::tcp_endpoint const & endpoint_a);
bool max_subnetwork_connections (nano::tcp_endpoint const & endpoint_a);
bool max_ip_or_subnetwork_connections (nano::tcp_endpoint const & endpoint_a);
// Should we reach out to this endpoint with a keepalive message
bool reachout (nano::endpoint const &);
std::unique_ptr<container_info_component> collect_container_info (std::string const &);

View file

@ -2,6 +2,9 @@
#include <nano/node/node.hpp>
#include <nano/node/transport/transport.hpp>
#include <boost/asio/ip/address.hpp>
#include <boost/asio/ip/address_v4.hpp>
#include <boost/asio/ip/address_v6.hpp>
#include <boost/format.hpp>
#include <numeric>
@ -82,7 +85,7 @@ nano::tcp_endpoint nano::transport::map_endpoint_to_tcp (nano::endpoint const &
boost::asio::ip::address nano::transport::map_address_to_subnetwork (boost::asio::ip::address const & address_a)
{
debug_assert (address_a.is_v6 ());
static short const ipv6_subnet_prefix_length = 32; // Limits for /32 IPv6 subnetwork
static short const ipv6_subnet_prefix_length = 32; // Equivalent to network prefix /32.
static short const ipv4_subnet_prefix_length = (128 - 32) + 24; // Limits for /24 IPv4 subnetwork
return address_a.to_v6 ().is_v4_mapped () ? boost::asio::ip::make_network_v6 (address_a.to_v6 (), ipv4_subnet_prefix_length).network () : boost::asio::ip::make_network_v6 (address_a.to_v6 (), ipv6_subnet_prefix_length).network ();
}
@ -159,12 +162,14 @@ std::string nano::transport::channel_loopback::to_string () const
return boost::str (boost::format ("%1%") % endpoint);
}
namespace
{
boost::asio::ip::address_v6 mapped_from_v4_bytes (unsigned long address_a)
boost::asio::ip::address_v6 nano::transport::mapped_from_v4_bytes (unsigned long address_a)
{
return boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (address_a));
}
boost::asio::ip::address_v6 nano::transport::mapped_from_v4_or_v6 (boost::asio::ip::address const & address_a)
{
return address_a.is_v4 () ? boost::asio::ip::address_v6::v4_mapped (address_a.to_v4 ()) : address_a.to_v6 ();
}
bool nano::transport::reserved_address (nano::endpoint const & endpoint_a, bool allow_local_peers)

View file

@ -30,6 +30,8 @@ namespace transport
nano::tcp_endpoint map_endpoint_to_tcp (nano::endpoint const &);
boost::asio::ip::address map_address_to_subnetwork (boost::asio::ip::address const &);
boost::asio::ip::address ipv4_address_or_ipv6_subnet (boost::asio::ip::address const &);
boost::asio::ip::address_v6 mapped_from_v4_bytes (unsigned long);
boost::asio::ip::address_v6 mapped_from_v4_or_v6 (boost::asio::ip::address const &);
// Unassigned, reserved, self
bool reserved_address (nano::endpoint const &, bool = false);
static std::chrono::seconds constexpr syn_cookie_cutoff = std::chrono::seconds (5);

View file

@ -100,7 +100,7 @@ std::shared_ptr<nano::transport::channel_udp> nano::transport::udp_channels::ins
{
debug_assert (endpoint_a.address ().is_v6 ());
std::shared_ptr<nano::transport::channel_udp> result;
if (!node.network.not_a_peer (endpoint_a, node.config.allow_local_peers) && (node.network_params.network.is_dev_network () || !max_ip_connections (endpoint_a)))
if (!node.network.not_a_peer (endpoint_a, node.config.allow_local_peers) && (node.network_params.network.is_dev_network () || !max_ip_or_subnetwork_connections (endpoint_a)))
{
nano::unique_lock<nano::mutex> lock (mutex);
auto existing (channels.get<endpoint_tag> ().find (endpoint_a));
@ -373,7 +373,7 @@ public:
}
void keepalive (nano::keepalive const & message_a) override
{
if (!node.network.udp_channels.max_ip_connections (endpoint))
if (!node.network.udp_channels.max_ip_or_subnetwork_connections (endpoint))
{
auto cookie (node.network.syn_cookies.assign (endpoint));
if (cookie)
@ -630,21 +630,45 @@ std::shared_ptr<nano::transport::channel> nano::transport::udp_channels::create
bool nano::transport::udp_channels::max_ip_connections (nano::endpoint const & endpoint_a)
{
bool result (false);
if (!node.flags.disable_max_peers_per_ip)
if (node.flags.disable_max_peers_per_ip)
{
auto const address (nano::transport::ipv4_address_or_ipv6_subnet (endpoint_a.address ()));
auto const subnet (nano::transport::map_address_to_subnetwork (endpoint_a.address ()));
nano::unique_lock<nano::mutex> lock (mutex);
result = channels.get<ip_address_tag> ().count (address) >= node.network_params.network.max_peers_per_ip || channels.get<subnetwork_tag> ().count (subnet) >= node.network_params.network.max_peers_per_subnetwork;
return false;
}
auto const address (nano::transport::ipv4_address_or_ipv6_subnet (endpoint_a.address ()));
nano::unique_lock<nano::mutex> lock (mutex);
auto const result = channels.get<ip_address_tag> ().count (address) >= node.network_params.network.max_peers_per_ip;
if (!result)
{
node.stats.inc (nano::stat::type::udp, nano::stat::detail::udp_max_per_ip, nano::stat::dir::out);
}
return result;
}
bool nano::transport::udp_channels::max_subnetwork_connections (nano::endpoint const & endpoint_a)
{
if (node.flags.disable_max_peers_per_subnetwork)
{
return false;
}
auto const subnet (nano::transport::map_address_to_subnetwork (endpoint_a.address ()));
nano::unique_lock<nano::mutex> lock (mutex);
auto const result = channels.get<subnetwork_tag> ().count (subnet) >= node.network_params.network.max_peers_per_subnetwork;
if (!result)
{
node.stats.inc (nano::stat::type::udp, nano::stat::detail::udp_max_per_subnetwork, nano::stat::dir::out);
}
return result;
}
bool nano::transport::udp_channels::max_ip_or_subnetwork_connections (nano::endpoint const & endpoint_a)
{
return max_ip_connections (endpoint_a) || max_subnetwork_connections (endpoint_a);
}
bool nano::transport::udp_channels::reachout (nano::endpoint const & endpoint_a)
{
// Don't overload single IP
bool error = max_ip_connections (endpoint_a);
bool error = max_ip_or_subnetwork_connections (endpoint_a);
if (!error && !node.flags.disable_udp)
{
auto endpoint_l (nano::transport::map_endpoint_to_v6 (endpoint_a));

View file

@ -96,7 +96,9 @@ namespace transport
void receive_action (nano::message_buffer *);
void process_packets ();
std::shared_ptr<nano::transport::channel> create (nano::endpoint const &);
bool max_ip_connections (nano::endpoint const &);
bool max_ip_connections (nano::endpoint const & endpoint_a);
bool max_subnetwork_connections (nano::endpoint const & endpoint_a);
bool max_ip_or_subnetwork_connections (nano::endpoint const & endpoint_a);
// Should we reach out to this endpoint with a keepalive message
bool reachout (nano::endpoint const &);
std::unique_ptr<container_info_component> collect_container_info (std::string const &);