Merge pull request #4595 from pwojcikdev/networking-fixes/connecting-4 "Remove temporary channels"
Remove temporary channels
This commit is contained in:
commit
958c6ea08e
29 changed files with 807 additions and 618 deletions
|
@ -291,7 +291,7 @@ TEST (bootstrap_processor, process_none)
|
|||
auto node1 = system.make_disconnected_node ();
|
||||
|
||||
bool done = false;
|
||||
node0->observers.socket_accepted.add ([&] (nano::transport::socket & socket) {
|
||||
node0->observers.socket_connected.add ([&] (nano::transport::socket & socket) {
|
||||
done = true;
|
||||
});
|
||||
|
||||
|
|
|
@ -853,9 +853,9 @@ TEST (network, tcp_no_accept_excluded_peers)
|
|||
{
|
||||
node0->network.excluded_peers.add (endpoint1_tcp);
|
||||
}
|
||||
ASSERT_EQ (0, node0->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::excluded));
|
||||
ASSERT_EQ (0, node0->stats.count (nano::stat::type::tcp_listener_rejected, nano::stat::detail::excluded));
|
||||
node1->network.merge_peer (node0->network.endpoint ());
|
||||
ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::excluded) >= 1);
|
||||
ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::tcp_listener_rejected, nano::stat::detail::excluded) >= 1);
|
||||
ASSERT_EQ (nullptr, node0->network.find_node_id (node1->get_node_id ()));
|
||||
|
||||
// Should not actively reachout to excluded peers
|
||||
|
@ -878,12 +878,12 @@ TEST (network, tcp_no_accept_excluded_peers)
|
|||
ASSERT_TIMELY_EQ (5s, node0->network.size (), 1);
|
||||
}
|
||||
|
||||
/*
|
||||
namespace nano
|
||||
{
|
||||
TEST (network, tcp_message_manager)
|
||||
{
|
||||
nano::tcp_message_manager manager (1);
|
||||
nano::tcp_message_item item;
|
||||
nano::transport::tcp_message_manager manager (1);
|
||||
item.node_id = nano::account (100);
|
||||
ASSERT_EQ (0, manager.entries.size ());
|
||||
manager.put_message (item);
|
||||
|
@ -938,6 +938,7 @@ TEST (network, tcp_message_manager)
|
|||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
TEST (network, cleanup_purge)
|
||||
{
|
||||
|
@ -1026,9 +1027,14 @@ TEST (network, filter_invalid_version_using)
|
|||
TEST (network, fill_keepalive_self)
|
||||
{
|
||||
nano::test::system system{ 2 };
|
||||
std::array<nano::endpoint, 8> target;
|
||||
system.nodes[0]->network.fill_keepalive_self (target);
|
||||
ASSERT_EQ (target[2].port (), system.nodes[1]->network.port);
|
||||
|
||||
auto get_keepalive = [&system] (nano::node & node) {
|
||||
std::array<nano::endpoint, 8> target;
|
||||
node.network.fill_keepalive_self (target);
|
||||
return target;
|
||||
};
|
||||
|
||||
ASSERT_TIMELY_EQ (5s, get_keepalive (system.node (0))[2].port (), system.nodes[1]->network.port);
|
||||
}
|
||||
|
||||
TEST (network, reconnect_cached)
|
||||
|
@ -1085,9 +1091,9 @@ TEST (network, reconnect_cached)
|
|||
/*
|
||||
* Tests that channel and channel container removes channels with dead local sockets
|
||||
*/
|
||||
TEST (network, purge_dead_channel_outgoing)
|
||||
TEST (network, purge_dead_channel)
|
||||
{
|
||||
nano::test::system system{};
|
||||
nano::test::system system;
|
||||
|
||||
nano::node_flags flags;
|
||||
// Disable non realtime sockets
|
||||
|
@ -1100,36 +1106,14 @@ TEST (network, purge_dead_channel_outgoing)
|
|||
|
||||
auto & node1 = *system.add_node (flags);
|
||||
|
||||
// We expect one incoming and one outgoing connection
|
||||
std::shared_ptr<nano::transport::socket> outgoing;
|
||||
std::shared_ptr<nano::transport::socket> incoming;
|
||||
|
||||
std::atomic<int> connected_count{ 0 };
|
||||
node1.observers.socket_connected.add ([&] (nano::transport::socket & socket) {
|
||||
connected_count++;
|
||||
outgoing = socket.shared_from_this ();
|
||||
|
||||
std::cout << "connected: " << socket.remote_endpoint () << std::endl;
|
||||
});
|
||||
|
||||
std::atomic<int> accepted_count{ 0 };
|
||||
node1.observers.socket_accepted.add ([&] (nano::transport::socket & socket) {
|
||||
accepted_count++;
|
||||
incoming = socket.shared_from_this ();
|
||||
|
||||
std::cout << "accepted: " << socket.remote_endpoint () << std::endl;
|
||||
node1.observers.socket_connected.add ([&] (nano::transport::socket & sock) {
|
||||
system.logger.debug (nano::log::type::test, "Connected: {}", sock);
|
||||
});
|
||||
|
||||
auto & node2 = *system.add_node (flags);
|
||||
|
||||
ASSERT_TIMELY_EQ (5s, connected_count, 1);
|
||||
ASSERT_ALWAYS_EQ (1s, connected_count, 1);
|
||||
|
||||
ASSERT_TIMELY_EQ (5s, accepted_count, 1);
|
||||
ASSERT_ALWAYS_EQ (1s, accepted_count, 1);
|
||||
|
||||
ASSERT_EQ (node1.network.size (), 1);
|
||||
ASSERT_ALWAYS_EQ (1s, node1.network.size (), 1);
|
||||
ASSERT_ALWAYS_EQ (500ms, node1.network.size (), 1);
|
||||
|
||||
// Store reference to the only channel
|
||||
auto channels = node1.network.list ();
|
||||
|
@ -1137,29 +1121,29 @@ TEST (network, purge_dead_channel_outgoing)
|
|||
auto channel = channels.front ();
|
||||
ASSERT_TRUE (channel);
|
||||
|
||||
auto sockets = node1.tcp_listener.sockets ();
|
||||
ASSERT_EQ (sockets.size (), 1);
|
||||
auto socket = sockets.front ();
|
||||
ASSERT_TRUE (socket);
|
||||
|
||||
// When socket is dead ensure channel knows about that
|
||||
ASSERT_TRUE (channel->alive ());
|
||||
outgoing->close ();
|
||||
ASSERT_TIMELY (5s, !channel->alive ());
|
||||
socket->close ();
|
||||
ASSERT_TIMELY (10s, !channel->alive ());
|
||||
|
||||
// Shortly after that a new channel should be established
|
||||
ASSERT_TIMELY_EQ (5s, connected_count, 2);
|
||||
ASSERT_ALWAYS_EQ (1s, connected_count, 2);
|
||||
|
||||
// Check that a new channel is healthy
|
||||
auto channels2 = node1.network.list ();
|
||||
ASSERT_EQ (channels2.size (), 1);
|
||||
auto channel2 = channels2.front ();
|
||||
ASSERT_TRUE (channel2);
|
||||
ASSERT_TRUE (channel2->alive ());
|
||||
auto channel_exists = [] (auto & node, auto & channel) {
|
||||
auto channels = node.network.list ();
|
||||
return std::find (channels.begin (), channels.end (), channel) != channels.end ();
|
||||
};
|
||||
ASSERT_TIMELY (5s, !channel_exists (node1, channel));
|
||||
}
|
||||
|
||||
/*
|
||||
* Tests that channel and channel container removes channels with dead remote sockets
|
||||
*/
|
||||
TEST (network, purge_dead_channel_incoming)
|
||||
TEST (network, purge_dead_channel_remote)
|
||||
{
|
||||
nano::test::system system{};
|
||||
nano::test::system system;
|
||||
|
||||
nano::node_flags flags;
|
||||
// Disable non realtime sockets
|
||||
|
@ -1171,37 +1155,15 @@ TEST (network, purge_dead_channel_incoming)
|
|||
flags.disable_wallet_bootstrap = true;
|
||||
|
||||
auto & node1 = *system.add_node (flags);
|
||||
|
||||
// We expect one incoming and one outgoing connection
|
||||
std::shared_ptr<nano::transport::socket> outgoing;
|
||||
std::shared_ptr<nano::transport::socket> incoming;
|
||||
|
||||
std::atomic<int> connected_count{ 0 };
|
||||
node1.observers.socket_connected.add ([&] (nano::transport::socket & socket) {
|
||||
connected_count++;
|
||||
outgoing = socket.shared_from_this ();
|
||||
|
||||
std::cout << "connected: " << socket.remote_endpoint () << std::endl;
|
||||
});
|
||||
|
||||
std::atomic<int> accepted_count{ 0 };
|
||||
node1.observers.socket_accepted.add ([&] (nano::transport::socket & socket) {
|
||||
accepted_count++;
|
||||
incoming = socket.shared_from_this ();
|
||||
|
||||
std::cout << "accepted: " << socket.remote_endpoint () << std::endl;
|
||||
});
|
||||
|
||||
auto & node2 = *system.add_node (flags);
|
||||
|
||||
ASSERT_TIMELY_EQ (5s, connected_count, 1);
|
||||
ASSERT_ALWAYS_EQ (1s, connected_count, 1);
|
||||
|
||||
ASSERT_TIMELY_EQ (5s, accepted_count, 1);
|
||||
ASSERT_ALWAYS_EQ (1s, accepted_count, 1);
|
||||
node2.observers.socket_connected.add ([&] (nano::transport::socket & sock) {
|
||||
system.logger.debug (nano::log::type::test, "Connected: {}", sock);
|
||||
});
|
||||
|
||||
ASSERT_EQ (node1.network.size (), 1);
|
||||
ASSERT_EQ (node2.network.size (), 1);
|
||||
ASSERT_ALWAYS_EQ (1s, node2.network.size (), 1);
|
||||
ASSERT_ALWAYS_EQ (500ms, std::min (node1.network.size (), node2.network.size ()), 1);
|
||||
|
||||
// Store reference to the only channel
|
||||
auto channels = node2.network.list ();
|
||||
|
@ -1209,19 +1171,19 @@ TEST (network, purge_dead_channel_incoming)
|
|||
auto channel = channels.front ();
|
||||
ASSERT_TRUE (channel);
|
||||
|
||||
auto sockets = node1.tcp_listener.sockets ();
|
||||
ASSERT_EQ (sockets.size (), 1);
|
||||
auto socket = sockets.front ();
|
||||
ASSERT_TRUE (socket);
|
||||
|
||||
// When remote socket is dead ensure channel knows about that
|
||||
ASSERT_TRUE (channel->alive ());
|
||||
incoming->close ();
|
||||
socket->close ();
|
||||
ASSERT_TIMELY (5s, !channel->alive ());
|
||||
|
||||
// Shortly after that a new channel should be established
|
||||
ASSERT_TIMELY_EQ (5s, accepted_count, 2);
|
||||
ASSERT_ALWAYS_EQ (1s, accepted_count, 2);
|
||||
|
||||
// Check that a new channel is healthy
|
||||
auto channels2 = node2.network.list ();
|
||||
ASSERT_EQ (channels2.size (), 1);
|
||||
auto channel2 = channels2.front ();
|
||||
ASSERT_TRUE (channel2);
|
||||
ASSERT_TRUE (channel2->alive ());
|
||||
auto channel_exists = [] (auto & node, auto & channel) {
|
||||
auto channels = node.network.list ();
|
||||
return std::find (channels.begin (), channels.end (), channel) != channels.end ();
|
||||
};
|
||||
ASSERT_TIMELY (5s, !channel_exists (node2, channel));
|
||||
}
|
||||
|
|
|
@ -26,8 +26,11 @@ TEST (peer_history, erase_old)
|
|||
{
|
||||
nano::test::system system;
|
||||
|
||||
auto & node1 = *system.add_node ();
|
||||
auto & node2 = *system.add_node ();
|
||||
auto node_config = system.default_config ();
|
||||
node_config.peer_history.erase_cutoff = 1s;
|
||||
|
||||
auto & node1 = *system.add_node (node_config);
|
||||
auto & node2 = *system.add_node (node_config);
|
||||
|
||||
ASSERT_TIMELY (5s, node1.peer_history.exists (node2.network.endpoint ()));
|
||||
ASSERT_TIMELY (5s, node2.peer_history.exists (node1.network.endpoint ()));
|
||||
|
@ -41,6 +44,7 @@ TEST (peer_history, erase_old)
|
|||
ASSERT_EQ (cached1.size (), 1);
|
||||
ASSERT_EQ (cached1[0], node2_endpoint);
|
||||
|
||||
ASSERT_TIMELY_EQ (5s, node1.network.size (), 0);
|
||||
ASSERT_TIMELY (5s, !node1.peer_history.exists (node2_endpoint));
|
||||
|
||||
auto cached2 = node1.peer_history.peers ();
|
||||
|
|
|
@ -89,12 +89,15 @@ TEST (rep_crawler, rep_weight)
|
|||
ASSERT_TRUE (nano::test::process (node2, { block1, block2, block3, block4 }));
|
||||
ASSERT_TRUE (nano::test::process (node3, { block1, block2, block3, block4 }));
|
||||
ASSERT_TRUE (node.rep_crawler.representatives (1).empty ());
|
||||
std::shared_ptr<nano::transport::channel> channel1 = nano::test::establish_tcp (system, node, node1.network.endpoint ());
|
||||
|
||||
ASSERT_TIMELY (5s, node.network.size () == 3);
|
||||
auto channel1 = node.network.find_node_id (node1.node_id.pub);
|
||||
auto channel2 = node.network.find_node_id (node2.node_id.pub);
|
||||
auto channel3 = node.network.find_node_id (node3.node_id.pub);
|
||||
ASSERT_NE (nullptr, channel1);
|
||||
std::shared_ptr<nano::transport::channel> channel2 = nano::test::establish_tcp (system, node, node2.network.endpoint ());
|
||||
ASSERT_NE (nullptr, channel2);
|
||||
std::shared_ptr<nano::transport::channel> channel3 = nano::test::establish_tcp (system, node, node3.network.endpoint ());
|
||||
ASSERT_NE (nullptr, channel3);
|
||||
|
||||
auto vote0 = std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, 0, 0, std::vector<nano::block_hash>{ nano::dev::genesis->hash () });
|
||||
auto vote1 = std::make_shared<nano::vote> (keypair1.pub, keypair1.prv, 0, 0, std::vector<nano::block_hash>{ nano::dev::genesis->hash () });
|
||||
auto vote2 = std::make_shared<nano::vote> (keypair2.pub, keypair2.prv, 0, 0, std::vector<nano::block_hash>{ nano::dev::genesis->hash () });
|
||||
|
@ -227,8 +230,6 @@ TEST (rep_crawler, rep_remove)
|
|||
reps = searching_node.rep_crawler.representatives (1);
|
||||
ASSERT_EQ (nano::dev::genesis_key.pub, reps[0].account);
|
||||
ASSERT_TIMELY_EQ (5s, searching_node.network.size (), 1);
|
||||
auto list (searching_node.network.list (1));
|
||||
ASSERT_EQ (node_genesis_rep->network.endpoint (), list[0]->get_endpoint ());
|
||||
}
|
||||
|
||||
TEST (rep_crawler, rep_connection_close)
|
||||
|
|
|
@ -36,7 +36,10 @@ TEST (socket, max_connections)
|
|||
std::mutex server_sockets_mutex;
|
||||
|
||||
// start a server socket that allows max 2 live connections
|
||||
nano::transport::tcp_listener listener{ server_port, *node, 2 };
|
||||
nano::transport::tcp_config tcp_config{ nano::dev::network_params.network };
|
||||
tcp_config.max_inbound_connections = 2;
|
||||
|
||||
nano::transport::tcp_listener listener{ server_port, tcp_config, *node };
|
||||
listener.connection_accepted.add ([&] (auto const & socket, auto const & server) {
|
||||
std::lock_guard guard{ server_sockets_mutex };
|
||||
server_sockets.push_back (socket);
|
||||
|
@ -63,12 +66,12 @@ TEST (socket, max_connections)
|
|||
auto client3 = std::make_shared<nano::transport::socket> (*node);
|
||||
client3->async_connect (dst_endpoint, connect_handler);
|
||||
|
||||
auto get_tcp_accept_failures = [&node] () {
|
||||
return node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_failure, nano::stat::dir::in);
|
||||
auto get_tcp_accept_failures = [] (auto & node) {
|
||||
return node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_failure);
|
||||
};
|
||||
|
||||
auto get_tcp_accept_successes = [&node] () {
|
||||
return node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, nano::stat::dir::in);
|
||||
auto get_tcp_accept_successes = [] (auto & node) {
|
||||
return node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success);
|
||||
};
|
||||
|
||||
auto server_sockets_size = [&] () {
|
||||
|
@ -76,8 +79,8 @@ TEST (socket, max_connections)
|
|||
return server_sockets.size ();
|
||||
};
|
||||
|
||||
ASSERT_TIMELY_EQ (10s, get_tcp_accept_successes (), 2);
|
||||
ASSERT_ALWAYS_EQ (1s, get_tcp_accept_successes (), 2);
|
||||
ASSERT_TIMELY_EQ (10s, get_tcp_accept_successes (node), 2);
|
||||
ASSERT_ALWAYS_EQ (1s, get_tcp_accept_successes (node), 2);
|
||||
ASSERT_TIMELY_EQ (5s, connection_attempts, 3);
|
||||
ASSERT_TIMELY_EQ (5s, server_sockets_size (), 2);
|
||||
|
||||
|
@ -94,8 +97,8 @@ TEST (socket, max_connections)
|
|||
auto client5 = std::make_shared<nano::transport::socket> (*node);
|
||||
client5->async_connect (dst_endpoint, connect_handler);
|
||||
|
||||
ASSERT_TIMELY_EQ (10s, get_tcp_accept_successes (), 3);
|
||||
ASSERT_ALWAYS_EQ (1s, get_tcp_accept_successes (), 3);
|
||||
ASSERT_TIMELY_EQ (10s, get_tcp_accept_successes (node), 3);
|
||||
ASSERT_ALWAYS_EQ (1s, get_tcp_accept_successes (node), 3);
|
||||
ASSERT_TIMELY_EQ (5s, connection_attempts, 5);
|
||||
ASSERT_TIMELY_EQ (5s, server_sockets.size (), 3);
|
||||
|
||||
|
@ -116,8 +119,8 @@ TEST (socket, max_connections)
|
|||
auto client8 = std::make_shared<nano::transport::socket> (*node);
|
||||
client8->async_connect (dst_endpoint, connect_handler);
|
||||
|
||||
ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), 5);
|
||||
ASSERT_ALWAYS_EQ (1s, get_tcp_accept_successes (), 5);
|
||||
ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (node), 5);
|
||||
ASSERT_ALWAYS_EQ (1s, get_tcp_accept_successes (node), 5);
|
||||
ASSERT_TIMELY_EQ (5s, connection_attempts, 8); // connections initiated by the client
|
||||
ASSERT_TIMELY_EQ (5s, server_sockets_size (), 5); // connections accepted by the server
|
||||
}
|
||||
|
@ -144,7 +147,10 @@ TEST (socket, max_connections_per_ip)
|
|||
// successful incoming connections are stored in server_sockets to keep them alive (server side)
|
||||
std::vector<std::shared_ptr<nano::transport::socket>> server_sockets;
|
||||
|
||||
nano::transport::tcp_listener listener{ server_port, *node, max_global_connections };
|
||||
nano::transport::tcp_config tcp_config{ nano::dev::network_params.network };
|
||||
tcp_config.max_inbound_connections = max_global_connections;
|
||||
|
||||
nano::transport::tcp_listener listener{ server_port, tcp_config, *node };
|
||||
listener.connection_accepted.add ([&server_sockets] (auto const & socket, auto const & server) {
|
||||
server_sockets.push_back (socket);
|
||||
});
|
||||
|
@ -170,16 +176,8 @@ TEST (socket, max_connections_per_ip)
|
|||
client_list.push_back (client);
|
||||
}
|
||||
|
||||
auto get_tcp_max_per_ip = [&node] () {
|
||||
return node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::max_per_ip, nano::stat::dir::in);
|
||||
};
|
||||
|
||||
auto get_tcp_accept_successes = [&node] () {
|
||||
return node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, nano::stat::dir::in);
|
||||
};
|
||||
|
||||
ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), max_ip_connections);
|
||||
ASSERT_TIMELY_EQ (5s, get_tcp_max_per_ip (), 1);
|
||||
ASSERT_TIMELY_EQ (5s, node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success), max_ip_connections);
|
||||
ASSERT_TIMELY_EQ (5s, node->stats.count (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_per_ip), 1);
|
||||
ASSERT_TIMELY_EQ (5s, connection_attempts, max_ip_connections + 1);
|
||||
}
|
||||
|
||||
|
@ -267,7 +265,10 @@ TEST (socket, max_connections_per_subnetwork)
|
|||
// successful incoming connections are stored in server_sockets to keep them alive (server side)
|
||||
std::vector<std::shared_ptr<nano::transport::socket>> server_sockets;
|
||||
|
||||
nano::transport::tcp_listener listener{ server_port, *node, max_global_connections };
|
||||
nano::transport::tcp_config tcp_config{ nano::dev::network_params.network };
|
||||
tcp_config.max_inbound_connections = max_global_connections;
|
||||
|
||||
nano::transport::tcp_listener listener{ server_port, tcp_config, *node };
|
||||
listener.connection_accepted.add ([&server_sockets] (auto const & socket, auto const & server) {
|
||||
server_sockets.push_back (socket);
|
||||
});
|
||||
|
@ -293,16 +294,8 @@ TEST (socket, max_connections_per_subnetwork)
|
|||
client_list.push_back (client);
|
||||
}
|
||||
|
||||
auto get_tcp_max_per_subnetwork = [&node] () {
|
||||
return node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::max_per_subnetwork, nano::stat::dir::in);
|
||||
};
|
||||
|
||||
auto get_tcp_accept_successes = [&node] () {
|
||||
return node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, nano::stat::dir::in);
|
||||
};
|
||||
|
||||
ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), max_subnetwork_connections);
|
||||
ASSERT_TIMELY_EQ (5s, get_tcp_max_per_subnetwork (), 1);
|
||||
ASSERT_TIMELY_EQ (5s, node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success), max_subnetwork_connections);
|
||||
ASSERT_TIMELY_EQ (5s, node->stats.count (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_per_subnetwork), 1);
|
||||
ASSERT_TIMELY_EQ (5s, connection_attempts, max_subnetwork_connections + 1);
|
||||
}
|
||||
|
||||
|
@ -330,7 +323,10 @@ TEST (socket, disabled_max_peers_per_ip)
|
|||
// successful incoming connections are stored in server_sockets to keep them alive (server side)
|
||||
std::vector<std::shared_ptr<nano::transport::socket>> server_sockets;
|
||||
|
||||
nano::transport::tcp_listener listener = { server_port, *node, max_global_connections };
|
||||
nano::transport::tcp_config tcp_config{ nano::dev::network_params.network };
|
||||
tcp_config.max_inbound_connections = max_global_connections;
|
||||
|
||||
nano::transport::tcp_listener listener = { server_port, tcp_config, *node };
|
||||
listener.connection_accepted.add ([&server_sockets] (auto const & socket, auto const & server) {
|
||||
server_sockets.push_back (socket);
|
||||
});
|
||||
|
@ -356,16 +352,8 @@ TEST (socket, disabled_max_peers_per_ip)
|
|||
client_list.push_back (client);
|
||||
}
|
||||
|
||||
auto get_tcp_max_per_ip = [&node] () {
|
||||
return node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::max_per_ip, nano::stat::dir::in);
|
||||
};
|
||||
|
||||
auto get_tcp_accept_successes = [&node] () {
|
||||
return node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, nano::stat::dir::in);
|
||||
};
|
||||
|
||||
ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), max_ip_connections + 1);
|
||||
ASSERT_TIMELY_EQ (5s, get_tcp_max_per_ip (), 0);
|
||||
ASSERT_TIMELY_EQ (5s, node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success), max_ip_connections + 1);
|
||||
ASSERT_TIMELY_EQ (5s, node->stats.count (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_per_ip), 0);
|
||||
ASSERT_TIMELY_EQ (5s, connection_attempts, max_ip_connections + 1);
|
||||
}
|
||||
|
||||
|
|
|
@ -1023,8 +1023,11 @@ TEST (websocket, telemetry)
|
|||
|
||||
ASSERT_TRUE (nano::test::compare_telemetry (telemetry_data, *node2));
|
||||
|
||||
ASSERT_EQ (contents.get<std::string> ("address"), node2->network.endpoint ().address ().to_string ());
|
||||
ASSERT_EQ (contents.get<uint16_t> ("port"), node2->network.endpoint ().port ());
|
||||
auto channel2 = node2->network.find_node_id (node1->get_node_id ());
|
||||
ASSERT_NE (channel2, nullptr);
|
||||
|
||||
ASSERT_EQ (contents.get<std::string> ("address"), channel2->get_local_endpoint ().address ().to_string ());
|
||||
ASSERT_EQ (contents.get<uint16_t> ("port"), channel2->get_local_endpoint ().port ());
|
||||
|
||||
// Other node should have no subscribers
|
||||
EXPECT_EQ (0, node2->websocket.server->subscriber_count (nano::websocket::topic::telemetry));
|
||||
|
|
|
@ -31,7 +31,9 @@ enum class type
|
|||
ipc,
|
||||
tcp,
|
||||
tcp_channels,
|
||||
tcp_channels_rejected,
|
||||
tcp_listener,
|
||||
tcp_listener_rejected,
|
||||
channel,
|
||||
socket,
|
||||
confirmation_height,
|
||||
|
@ -259,12 +261,26 @@ enum class detail
|
|||
accept_success,
|
||||
accept_error,
|
||||
accept_failure,
|
||||
accept_limits_exceeded,
|
||||
accept_rejected,
|
||||
close_error,
|
||||
max_per_ip,
|
||||
max_per_subnetwork,
|
||||
max_attempts,
|
||||
max_attempts_per_ip,
|
||||
excluded,
|
||||
erase_dead,
|
||||
connect_initiate,
|
||||
connect_failure,
|
||||
connect_error,
|
||||
connect_rejected,
|
||||
connect_success,
|
||||
attempt_timeout,
|
||||
not_a_peer,
|
||||
|
||||
// tcp_channels
|
||||
channel_accepted,
|
||||
channel_rejected,
|
||||
channel_duplicate,
|
||||
|
||||
// tcp_server
|
||||
handshake,
|
||||
|
|
|
@ -17,15 +17,15 @@ nano::thread_runner::thread_runner (std::shared_ptr<boost::asio::io_context> io_
|
|||
{
|
||||
debug_assert (io_ctx != nullptr);
|
||||
|
||||
logger.debug (nano::log::type::thread_runner, "Starting threads: {} ({})", num_threads, to_string (role));
|
||||
|
||||
for (auto i (0u); i < num_threads; ++i)
|
||||
{
|
||||
threads.emplace_back (nano::thread_attributes::get_default (), [this, i] () {
|
||||
nano::thread_role::set (role);
|
||||
try
|
||||
{
|
||||
logger.debug (nano::log::type::thread_runner, "Started thread #{} ({})", i, to_string (role));
|
||||
run ();
|
||||
logger.debug (nano::log::type::thread_runner, "Stopped thread #{} ({})", i, to_string (role));
|
||||
}
|
||||
catch (std::exception const & ex)
|
||||
{
|
||||
|
@ -88,6 +88,9 @@ void nano::thread_runner::join ()
|
|||
i.join ();
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug (nano::log::type::thread_runner, "Stopped threads ({})", to_string (role));
|
||||
|
||||
io_ctx.reset ();
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ nano::bootstrap_client::bootstrap_client (std::shared_ptr<nano::node> const & no
|
|||
{
|
||||
++node_a->bootstrap_initiator.connections->connections_count;
|
||||
receive_buffer->resize (256);
|
||||
channel->update_endpoint ();
|
||||
channel->update_endpoints ();
|
||||
}
|
||||
|
||||
nano::bootstrap_client::~bootstrap_client ()
|
||||
|
|
|
@ -17,6 +17,8 @@ nano::confirmation_solicitor::confirmation_solicitor (nano::network & network_a,
|
|||
void nano::confirmation_solicitor::prepare (std::vector<nano::representative> const & representatives_a)
|
||||
{
|
||||
debug_assert (!prepared);
|
||||
debug_assert (std::none_of (representatives_a.begin (), representatives_a.end (), [] (auto const & rep) { return rep.channel == nullptr; }));
|
||||
|
||||
requests.clear ();
|
||||
rebroadcasted = 0;
|
||||
/** Two copies are required as representatives can be erased from \p representatives_requests */
|
||||
|
|
|
@ -3060,6 +3060,10 @@ void nano::json_handler::peers ()
|
|||
}
|
||||
debug_assert (channel->get_type () == nano::transport::transport_type::tcp);
|
||||
pending_tree.put ("type", "tcp");
|
||||
|
||||
auto peering_endpoint = channel->get_peering_endpoint ();
|
||||
pending_tree.put ("peering", boost::lexical_cast<std::string> (peering_endpoint));
|
||||
|
||||
peers_l.push_back (boost::property_tree::ptree::value_type (text.str (), pending_tree));
|
||||
}
|
||||
else
|
||||
|
|
|
@ -363,6 +363,7 @@ public:
|
|||
auto peer0 (message_a.peers[0]);
|
||||
if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0)
|
||||
{
|
||||
// TODO: Remove this as we do not need to establish a second connection to the same peer
|
||||
nano::endpoint new_endpoint (channel->get_tcp_endpoint ().address (), peer0.port ());
|
||||
node.network.merge_peer (new_endpoint);
|
||||
|
||||
|
@ -733,13 +734,13 @@ nano::node_id_handshake::response_payload nano::network::prepare_handshake_respo
|
|||
* tcp_message_manager
|
||||
*/
|
||||
|
||||
nano::tcp_message_manager::tcp_message_manager (unsigned incoming_connections_max_a) :
|
||||
max_entries (incoming_connections_max_a * nano::tcp_message_manager::max_entries_per_connection + 1)
|
||||
nano::transport::tcp_message_manager::tcp_message_manager (unsigned incoming_connections_max_a) :
|
||||
max_entries (incoming_connections_max_a * max_entries_per_connection + 1)
|
||||
{
|
||||
debug_assert (max_entries > 0);
|
||||
}
|
||||
|
||||
void nano::tcp_message_manager::put_message (nano::tcp_message_item const & item_a)
|
||||
void nano::transport::tcp_message_manager::put (std::unique_ptr<nano::message> message, std::shared_ptr<nano::transport::channel_tcp> channel)
|
||||
{
|
||||
{
|
||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||
|
@ -747,14 +748,14 @@ void nano::tcp_message_manager::put_message (nano::tcp_message_item const & item
|
|||
{
|
||||
producer_condition.wait (lock);
|
||||
}
|
||||
entries.push_back (item_a);
|
||||
entries.emplace_back (std::move (message), channel);
|
||||
}
|
||||
consumer_condition.notify_one ();
|
||||
}
|
||||
|
||||
nano::tcp_message_item nano::tcp_message_manager::get_message ()
|
||||
auto nano::transport::tcp_message_manager::next () -> entry_t
|
||||
{
|
||||
nano::tcp_message_item result;
|
||||
entry_t result{ nullptr, nullptr };
|
||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||
while (entries.empty () && !stopped)
|
||||
{
|
||||
|
@ -765,16 +766,12 @@ nano::tcp_message_item nano::tcp_message_manager::get_message ()
|
|||
result = std::move (entries.front ());
|
||||
entries.pop_front ();
|
||||
}
|
||||
else
|
||||
{
|
||||
result = nano::tcp_message_item{ nullptr, nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0), 0, nullptr };
|
||||
}
|
||||
lock.unlock ();
|
||||
producer_condition.notify_one ();
|
||||
return result;
|
||||
}
|
||||
|
||||
void nano::tcp_message_manager::stop ()
|
||||
void nano::transport::tcp_message_manager::stop ()
|
||||
{
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
|
|
|
@ -171,7 +171,7 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
|
|||
// Thus, be very careful if you change the order: if `bootstrap` gets constructed before `network`,
|
||||
// the latter would inherit the port from the former (if TCP is active, otherwise `network` picks first)
|
||||
//
|
||||
tcp_listener_impl{ std::make_unique<nano::transport::tcp_listener> (network.port, *this, config.tcp_incoming_connections_max) },
|
||||
tcp_listener_impl{ std::make_unique<nano::transport::tcp_listener> (network.port, config.tcp, *this) },
|
||||
tcp_listener{ *tcp_listener_impl },
|
||||
application_path (application_path_a),
|
||||
port_mapping (*this),
|
||||
|
@ -660,11 +660,11 @@ void nano::node::start ()
|
|||
network.port = tcp_listener.endpoint ().port ();
|
||||
}
|
||||
|
||||
logger.info (nano::log::type::node, "Node peering port: {}", network.port.load ());
|
||||
logger.info (nano::log::type::node, "Peering port: {}", network.port.load ());
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.warn (nano::log::type::node, "Node peering is disabled");
|
||||
logger.warn (nano::log::type::node, "Peering is disabled");
|
||||
}
|
||||
|
||||
if (!flags.disable_backup)
|
||||
|
|
|
@ -31,9 +31,7 @@ public:
|
|||
nano::observer_set<> disconnect;
|
||||
nano::observer_set<nano::root const &> work_cancel;
|
||||
nano::observer_set<nano::telemetry_data const &, std::shared_ptr<nano::transport::channel> const &> telemetry;
|
||||
|
||||
nano::observer_set<nano::transport::socket &> socket_connected;
|
||||
nano::observer_set<nano::transport::socket &> socket_accepted;
|
||||
};
|
||||
|
||||
std::unique_ptr<container_info_component> collect_container_info (node_observers & node_observers, std::string const & name);
|
||||
|
|
|
@ -35,7 +35,8 @@ nano::node_config::node_config (const std::optional<uint16_t> & peering_port_a,
|
|||
external_address{ boost::asio::ip::address_v6{}.to_string () },
|
||||
rep_crawler{ network_params.network },
|
||||
block_processor{ network_params.network },
|
||||
peer_history{ network_params.network }
|
||||
peer_history{ network_params.network },
|
||||
tcp{ network_params.network }
|
||||
{
|
||||
if (peering_port == 0)
|
||||
{
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include <nano/node/repcrawler.hpp>
|
||||
#include <nano/node/scheduler/hinted.hpp>
|
||||
#include <nano/node/scheduler/optimistic.hpp>
|
||||
#include <nano/node/transport/tcp_listener.hpp>
|
||||
#include <nano/node/vote_cache.hpp>
|
||||
#include <nano/node/vote_processor.hpp>
|
||||
#include <nano/node/websocketconfig.hpp>
|
||||
|
@ -143,6 +144,7 @@ public:
|
|||
nano::block_processor_config block_processor;
|
||||
nano::vote_processor_config vote_processor;
|
||||
nano::peer_history_config peer_history;
|
||||
nano::transport::tcp_config tcp;
|
||||
|
||||
public:
|
||||
std::string serialize_frontiers_confirmation (nano::frontiers_confirmation_mode) const;
|
||||
|
|
|
@ -86,7 +86,7 @@ void nano::peer_history::run_one ()
|
|||
// Add or update live peers
|
||||
for (auto const & peer : live_peers)
|
||||
{
|
||||
auto const endpoint = peer->get_endpoint ();
|
||||
auto const endpoint = peer->get_peering_endpoint ();
|
||||
bool const exists = store.peer.exists (transaction, endpoint);
|
||||
store.peer.put (transaction, endpoint, nano::milliseconds_since_epoch ());
|
||||
if (!exists)
|
||||
|
@ -141,7 +141,7 @@ nano::peer_history_config::peer_history_config (nano::network_constants const &
|
|||
if (network.is_dev_network ())
|
||||
{
|
||||
check_interval = 1s;
|
||||
erase_cutoff = 3s;
|
||||
erase_cutoff = 10s;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -43,6 +43,7 @@ public:
|
|||
virtual std::string to_string () const = 0;
|
||||
virtual nano::endpoint get_endpoint () const = 0;
|
||||
virtual nano::tcp_endpoint get_tcp_endpoint () const = 0;
|
||||
virtual nano::endpoint get_local_endpoint () const = 0;
|
||||
virtual nano::transport::transport_type get_type () const = 0;
|
||||
|
||||
virtual bool max (nano::transport::traffic_type = nano::transport::traffic_type::generic)
|
||||
|
|
35
nano/node/transport/common.hpp
Normal file
35
nano/node/transport/common.hpp
Normal file
|
@ -0,0 +1,35 @@
|
|||
#pragma once
|
||||
|
||||
#include <string_view>
|
||||
|
||||
namespace nano::transport
|
||||
{
|
||||
/** Policy to affect at which stage a buffer can be dropped */
|
||||
enum class buffer_drop_policy
|
||||
{
|
||||
/** Can be dropped by bandwidth limiter (default) */
|
||||
limiter,
|
||||
/** Should not be dropped by bandwidth limiter */
|
||||
no_limiter_drop,
|
||||
/** Should not be dropped by bandwidth limiter or socket write queue limiter */
|
||||
no_socket_drop
|
||||
};
|
||||
|
||||
enum class socket_type
|
||||
{
|
||||
undefined,
|
||||
bootstrap,
|
||||
realtime,
|
||||
realtime_response_server // special type for tcp channel response server
|
||||
};
|
||||
|
||||
std::string_view to_string (socket_type);
|
||||
|
||||
enum class socket_endpoint
|
||||
{
|
||||
server, // Socket was created by accepting an incoming connection
|
||||
client, // Socket was created by initiating an outgoing connection
|
||||
};
|
||||
|
||||
std::string_view to_string (socket_endpoint);
|
||||
}
|
|
@ -40,6 +40,11 @@ namespace transport
|
|||
return nano::transport::map_endpoint_to_tcp (endpoint);
|
||||
}
|
||||
|
||||
nano::endpoint get_local_endpoint () const override
|
||||
{
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
nano::transport::transport_type get_type () const override
|
||||
{
|
||||
return nano::transport::transport_type::fake;
|
||||
|
|
|
@ -32,6 +32,11 @@ namespace transport
|
|||
return nano::transport::map_endpoint_to_tcp (endpoint);
|
||||
}
|
||||
|
||||
nano::endpoint get_local_endpoint () const override
|
||||
{
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
nano::transport::transport_type get_type () const override
|
||||
{
|
||||
return nano::transport::transport_type::loopback;
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
#include <nano/lib/locks.hpp>
|
||||
#include <nano/lib/logging.hpp>
|
||||
#include <nano/lib/timer.hpp>
|
||||
#include <nano/node/transport/common.hpp>
|
||||
#include <nano/node/transport/traffic_type.hpp>
|
||||
|
||||
#include <chrono>
|
||||
|
@ -28,35 +29,6 @@ class node;
|
|||
|
||||
namespace nano::transport
|
||||
{
|
||||
/** Policy to affect at which stage a buffer can be dropped */
|
||||
enum class buffer_drop_policy
|
||||
{
|
||||
/** Can be dropped by bandwidth limiter (default) */
|
||||
limiter,
|
||||
/** Should not be dropped by bandwidth limiter */
|
||||
no_limiter_drop,
|
||||
/** Should not be dropped by bandwidth limiter or socket write queue limiter */
|
||||
no_socket_drop
|
||||
};
|
||||
|
||||
enum class socket_type
|
||||
{
|
||||
undefined,
|
||||
bootstrap,
|
||||
realtime,
|
||||
realtime_response_server // special type for tcp channel response server
|
||||
};
|
||||
|
||||
std::string_view to_string (socket_type);
|
||||
|
||||
enum class socket_endpoint
|
||||
{
|
||||
server, // Socket was created by accepting an incoming connection
|
||||
client, // Socket was created by initiating an outgoing connection
|
||||
};
|
||||
|
||||
std::string_view to_string (socket_endpoint);
|
||||
|
||||
/** Socket class for tcp clients and newly accepted connections */
|
||||
class socket final : public std::enable_shared_from_this<socket>
|
||||
{
|
||||
|
@ -71,7 +43,7 @@ public:
|
|||
explicit socket (nano::node &, nano::transport::socket_endpoint = socket_endpoint::client, std::size_t max_queue_size = default_max_queue_size);
|
||||
|
||||
// TODO: Accepting remote/local endpoints as a parameter is unnecessary, but is needed for now to keep compatibility with the legacy code
|
||||
explicit socket (
|
||||
socket (
|
||||
nano::node &,
|
||||
boost::asio::ip::tcp::socket,
|
||||
boost::asio::ip::tcp::endpoint remote_endpoint,
|
||||
|
@ -125,7 +97,7 @@ public:
|
|||
}
|
||||
bool is_realtime_connection () const
|
||||
{
|
||||
return type () == socket_type::realtime || type () == socket_type::realtime_response_server;
|
||||
return type () == socket_type::realtime;
|
||||
}
|
||||
bool is_bootstrap_connection () const
|
||||
{
|
||||
|
|
|
@ -23,10 +23,21 @@ nano::transport::channel_tcp::~channel_tcp ()
|
|||
// Close socket. Exception: socket is used by tcp_server
|
||||
if (auto socket_l = socket.lock ())
|
||||
{
|
||||
if (!temporary)
|
||||
{
|
||||
socket_l->close ();
|
||||
}
|
||||
socket_l->close ();
|
||||
}
|
||||
}
|
||||
|
||||
void nano::transport::channel_tcp::update_endpoints ()
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lk (channel_mutex);
|
||||
|
||||
debug_assert (endpoint == nano::endpoint{}); // Not initialized endpoint value
|
||||
debug_assert (local_endpoint == nano::endpoint{}); // Not initialized endpoint value
|
||||
|
||||
if (auto socket_l = socket.lock ())
|
||||
{
|
||||
endpoint = socket_l->remote_endpoint ();
|
||||
local_endpoint = socket_l->local_endpoint ();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -145,31 +156,87 @@ void nano::transport::tcp_channels::close ()
|
|||
channels.clear ();
|
||||
}
|
||||
|
||||
bool nano::transport::tcp_channels::insert (std::shared_ptr<nano::transport::channel_tcp> const & channel_a, std::shared_ptr<nano::transport::socket> const & socket_a, std::shared_ptr<nano::transport::tcp_server> const & server_a)
|
||||
bool nano::transport::tcp_channels::check (const nano::tcp_endpoint & endpoint, const nano::account & node_id) const
|
||||
{
|
||||
auto endpoint (channel_a->get_tcp_endpoint ());
|
||||
debug_assert (endpoint.address ().is_v6 ());
|
||||
auto udp_endpoint (nano::transport::map_tcp_to_endpoint (endpoint));
|
||||
bool error (true);
|
||||
if (!node.network.not_a_peer (udp_endpoint, node.config.allow_local_peers) && !stopped)
|
||||
debug_assert (!mutex.try_lock ());
|
||||
|
||||
if (stopped)
|
||||
{
|
||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||
auto existing (channels.get<endpoint_tag> ().find (endpoint));
|
||||
if (existing == channels.get<endpoint_tag> ().end ())
|
||||
{
|
||||
auto node_id (channel_a->get_node_id ());
|
||||
if (!channel_a->temporary)
|
||||
{
|
||||
channels.get<node_id_tag> ().erase (node_id);
|
||||
}
|
||||
channels.get<endpoint_tag> ().emplace (channel_a, socket_a, server_a);
|
||||
attempts.get<endpoint_tag> ().erase (endpoint);
|
||||
error = false;
|
||||
lock.unlock ();
|
||||
node.network.channel_observer (channel_a);
|
||||
}
|
||||
return false; // Reject
|
||||
}
|
||||
return error;
|
||||
|
||||
if (node.network.not_a_peer (nano::transport::map_tcp_to_endpoint (endpoint), node.config.allow_local_peers))
|
||||
{
|
||||
node.stats.inc (nano::stat::type::tcp_channels_rejected, nano::stat::detail::not_a_peer);
|
||||
node.logger.debug (nano::log::type::tcp_channels, "Rejected invalid endpoint channel from: {}", fmt::streamed (endpoint));
|
||||
|
||||
return false; // Reject
|
||||
}
|
||||
|
||||
bool has_duplicate = std::any_of (channels.begin (), channels.end (), [&endpoint, &node_id] (auto const & channel) {
|
||||
if (nano::transport::is_same_ip (channel.endpoint ().address (), endpoint.address ()))
|
||||
{
|
||||
// Only counsider channels with the same node id as duplicates if they come from the same IP
|
||||
if (channel.node_id () == node_id)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
});
|
||||
|
||||
if (has_duplicate)
|
||||
{
|
||||
node.stats.inc (nano::stat::type::tcp_channels_rejected, nano::stat::detail::channel_duplicate);
|
||||
node.logger.debug (nano::log::type::tcp_channels, "Duplicate channel rejected from: {} ({})", fmt::streamed (endpoint), node_id.to_node_id ());
|
||||
|
||||
return false; // Reject
|
||||
}
|
||||
|
||||
return true; // OK
|
||||
}
|
||||
|
||||
// This should be the only place in node where channels are created
|
||||
std::shared_ptr<nano::transport::channel_tcp> nano::transport::tcp_channels::create (const std::shared_ptr<nano::transport::socket> & socket, const std::shared_ptr<nano::transport::tcp_server> & server, const nano::account & node_id)
|
||||
{
|
||||
auto const endpoint = socket->remote_endpoint ();
|
||||
debug_assert (endpoint.address ().is_v6 ());
|
||||
|
||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||
|
||||
if (stopped)
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (!check (endpoint, node_id))
|
||||
{
|
||||
node.stats.inc (nano::stat::type::tcp_channels, nano::stat::detail::channel_rejected);
|
||||
node.logger.debug (nano::log::type::tcp_channels, "Rejected new channel from: {} ({})", fmt::streamed (endpoint), node_id.to_node_id ());
|
||||
// Rejection reason should be logged earlier
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
node.stats.inc (nano::stat::type::tcp_channels, nano::stat::detail::channel_accepted);
|
||||
node.logger.debug (nano::log::type::tcp_channels, "Accepted new channel from: {} ({})",
|
||||
fmt::streamed (socket->remote_endpoint ()),
|
||||
node_id.to_node_id ());
|
||||
|
||||
auto channel = std::make_shared<nano::transport::channel_tcp> (node, socket);
|
||||
channel->update_endpoints ();
|
||||
channel->set_node_id (node_id);
|
||||
|
||||
attempts.get<endpoint_tag> ().erase (endpoint);
|
||||
|
||||
auto [_, inserted] = channels.get<endpoint_tag> ().emplace (channel, socket, server);
|
||||
debug_assert (inserted);
|
||||
|
||||
lock.unlock ();
|
||||
|
||||
node.network.channel_observer (channel);
|
||||
|
||||
return channel;
|
||||
}
|
||||
|
||||
void nano::transport::tcp_channels::erase (nano::tcp_endpoint const & endpoint_a)
|
||||
|
@ -216,7 +283,7 @@ std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::transport::t
|
|||
continue;
|
||||
}
|
||||
|
||||
if (channel->get_network_version () >= min_version && (include_temporary_channels_a || !channel->temporary))
|
||||
if (channel->get_network_version () >= min_version)
|
||||
{
|
||||
result.insert (channel);
|
||||
}
|
||||
|
@ -275,65 +342,23 @@ nano::tcp_endpoint nano::transport::tcp_channels::bootstrap_peer ()
|
|||
return result;
|
||||
}
|
||||
|
||||
void nano::transport::tcp_channels::queue_message (std::unique_ptr<nano::message> message, std::shared_ptr<nano::transport::channel_tcp> channel)
|
||||
{
|
||||
if (!stopped)
|
||||
{
|
||||
message_manager.put (std::move (message), std::move (channel));
|
||||
}
|
||||
}
|
||||
|
||||
void nano::transport::tcp_channels::process_messages ()
|
||||
{
|
||||
while (!stopped)
|
||||
{
|
||||
auto item = message_manager.get_message ();
|
||||
if (item.message != nullptr)
|
||||
auto [message, channel] = message_manager.next ();
|
||||
if (message != nullptr)
|
||||
{
|
||||
process_message (*item.message, item.endpoint, item.node_id, item.socket);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void nano::transport::tcp_channels::process_message (nano::message const & message_a, nano::tcp_endpoint const & endpoint_a, nano::account const & node_id_a, std::shared_ptr<nano::transport::socket> const & socket_a)
|
||||
{
|
||||
auto type_a = socket_a->type ();
|
||||
if (!stopped && message_a.header.version_using >= node.network_params.network.protocol_version_min)
|
||||
{
|
||||
auto channel (node.network.find_channel (nano::transport::map_tcp_to_endpoint (endpoint_a)));
|
||||
if (channel)
|
||||
{
|
||||
sink (message_a, channel);
|
||||
}
|
||||
else
|
||||
{
|
||||
channel = node.network.find_node_id (node_id_a);
|
||||
if (channel)
|
||||
{
|
||||
sink (message_a, channel);
|
||||
}
|
||||
else if (!node.network.excluded_peers.check (endpoint_a))
|
||||
{
|
||||
if (!node_id_a.is_zero ())
|
||||
{
|
||||
// Add temporary channel
|
||||
auto temporary_channel (std::make_shared<nano::transport::channel_tcp> (node, socket_a));
|
||||
temporary_channel->update_endpoint ();
|
||||
debug_assert (endpoint_a == temporary_channel->get_tcp_endpoint ());
|
||||
temporary_channel->set_node_id (node_id_a);
|
||||
temporary_channel->set_network_version (message_a.header.version_using);
|
||||
temporary_channel->temporary = true;
|
||||
debug_assert (type_a == nano::transport::socket_type::realtime || type_a == nano::transport::socket_type::realtime_response_server);
|
||||
// Don't insert temporary channels for response_server
|
||||
if (type_a == nano::transport::socket_type::realtime)
|
||||
{
|
||||
insert (temporary_channel, socket_a, nullptr);
|
||||
}
|
||||
sink (message_a, temporary_channel);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Initial node_id_handshake request without node ID
|
||||
debug_assert (message_a.header.type == nano::message_type::node_id_handshake);
|
||||
node.stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (channel)
|
||||
{
|
||||
channel->set_last_packet_received (std::chrono::steady_clock::now ());
|
||||
release_assert (channel != nullptr);
|
||||
sink (*message, channel);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -529,7 +554,7 @@ void nano::transport::tcp_channels::list (std::deque<std::shared_ptr<nano::trans
|
|||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
// clang-format off
|
||||
nano::transform_if (channels.get<random_access_tag> ().begin (), channels.get<random_access_tag> ().end (), std::back_inserter (deque_a),
|
||||
[include_temporary_channels_a, minimum_version_a](auto & channel_a) { return channel_a.channel->get_network_version () >= minimum_version_a && (include_temporary_channels_a || !channel_a.channel->temporary); },
|
||||
[include_temporary_channels_a, minimum_version_a](auto & channel_a) { return channel_a.channel->get_network_version () >= minimum_version_a; },
|
||||
[](auto const & channel) { return channel.channel; });
|
||||
// clang-format on
|
||||
}
|
||||
|
@ -546,199 +571,7 @@ void nano::transport::tcp_channels::modify (std::shared_ptr<nano::transport::cha
|
|||
}
|
||||
}
|
||||
|
||||
void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a)
|
||||
void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint)
|
||||
{
|
||||
auto socket = std::make_shared<nano::transport::socket> (node);
|
||||
std::weak_ptr<nano::transport::socket> socket_w (socket);
|
||||
auto channel (std::make_shared<nano::transport::channel_tcp> (node, socket_w));
|
||||
std::weak_ptr<nano::node> node_w (node.shared ());
|
||||
socket->async_connect (nano::transport::map_endpoint_to_tcp (endpoint_a),
|
||||
[node_w, channel, socket, endpoint_a] (boost::system::error_code const & ec) {
|
||||
if (auto node_l = node_w.lock ())
|
||||
{
|
||||
if (!ec && channel)
|
||||
{
|
||||
// TCP node ID handshake
|
||||
auto query = node_l->network.prepare_handshake_query (endpoint_a);
|
||||
nano::node_id_handshake message{ node_l->network_params.network, query };
|
||||
|
||||
node_l->logger.debug (nano::log::type::tcp, "Handshake sent to: {} (query: {})",
|
||||
nano::util::to_str (endpoint_a),
|
||||
(query ? query->cookie.to_string () : "<none>"));
|
||||
|
||||
channel->update_endpoint ();
|
||||
std::shared_ptr<std::vector<uint8_t>> receive_buffer (std::make_shared<std::vector<uint8_t>> ());
|
||||
receive_buffer->resize (256);
|
||||
channel->send (message, [node_w, channel, endpoint_a, receive_buffer] (boost::system::error_code const & ec, std::size_t size_a) {
|
||||
if (auto node_l = node_w.lock ())
|
||||
{
|
||||
if (!ec)
|
||||
{
|
||||
node_l->network.tcp_channels.start_tcp_receive_node_id (channel, endpoint_a, receive_buffer);
|
||||
}
|
||||
else
|
||||
{
|
||||
node_l->logger.debug (nano::log::type::tcp, "Error sending handshake to: {} ({})", nano::util::to_str (endpoint_a), ec.message ());
|
||||
|
||||
if (auto socket_l = channel->socket.lock ())
|
||||
{
|
||||
socket_l->close ();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
if (ec)
|
||||
{
|
||||
node_l->logger.debug (nano::log::type::tcp, "Error connecting to: {} ({})", nano::util::to_str (endpoint_a), ec.message ());
|
||||
}
|
||||
else
|
||||
{
|
||||
node_l->logger.debug (nano::log::type::tcp, "Error connecting to: {}", nano::util::to_str (endpoint_a));
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<nano::transport::channel_tcp> const & channel_a, nano::endpoint const & endpoint_a, std::shared_ptr<std::vector<uint8_t>> const & receive_buffer_a)
|
||||
{
|
||||
std::weak_ptr<nano::node> node_w (node.shared ());
|
||||
auto socket_l = channel_a->socket.lock ();
|
||||
if (!socket_l)
|
||||
{
|
||||
return;
|
||||
}
|
||||
auto cleanup_node_id_handshake_socket = [socket_w = channel_a->socket, node_w] (nano::endpoint const & endpoint_a) {
|
||||
if (auto node_l = node_w.lock ())
|
||||
{
|
||||
if (auto socket_l = socket_w.lock ())
|
||||
{
|
||||
socket_l->close ();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
auto message_deserializer = std::make_shared<nano::transport::message_deserializer> (node.network_params.network, node.network.publish_filter, node.block_uniquer, node.vote_uniquer,
|
||||
[socket_l] (std::shared_ptr<std::vector<uint8_t>> const & data_a, size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a) {
|
||||
debug_assert (socket_l != nullptr);
|
||||
socket_l->read_impl (data_a, size_a, callback_a);
|
||||
});
|
||||
message_deserializer->read ([node_w, socket_l, channel_a, endpoint_a, cleanup_node_id_handshake_socket] (boost::system::error_code ec, std::unique_ptr<nano::message> message) {
|
||||
auto node_l = node_w.lock ();
|
||||
if (!node_l)
|
||||
{
|
||||
return;
|
||||
}
|
||||
if (ec || !channel_a)
|
||||
{
|
||||
node_l->logger.debug (nano::log::type::tcp, "Error reading handshake from: {} ({})", nano::util::to_str (endpoint_a), ec.message ());
|
||||
|
||||
cleanup_node_id_handshake_socket (endpoint_a);
|
||||
return;
|
||||
}
|
||||
node_l->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in);
|
||||
auto error (false);
|
||||
|
||||
// the header type should in principle be checked after checking the network bytes and the version numbers, I will not change it here since the benefits do not outweight the difficulties
|
||||
if (error || message->type () != nano::message_type::node_id_handshake)
|
||||
{
|
||||
node_l->logger.debug (nano::log::type::tcp, "Error reading handshake header from: {} ({})", nano::util::to_str (endpoint_a), ec.message ());
|
||||
|
||||
cleanup_node_id_handshake_socket (endpoint_a);
|
||||
return;
|
||||
}
|
||||
auto & handshake = static_cast<nano::node_id_handshake &> (*message);
|
||||
|
||||
if (message->header.network != node_l->network_params.network.current_network || message->header.version_using < node_l->network_params.network.protocol_version_min)
|
||||
{
|
||||
// error handling, either the networks bytes or the version is wrong
|
||||
if (message->header.network == node_l->network_params.network.current_network)
|
||||
{
|
||||
node_l->stats.inc (nano::stat::type::message, nano::stat::detail::invalid_network);
|
||||
}
|
||||
else
|
||||
{
|
||||
node_l->stats.inc (nano::stat::type::message, nano::stat::detail::outdated_version);
|
||||
}
|
||||
|
||||
cleanup_node_id_handshake_socket (endpoint_a);
|
||||
// Cleanup attempt
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ node_l->network.tcp_channels.mutex };
|
||||
node_l->network.tcp_channels.attempts.get<endpoint_tag> ().erase (nano::transport::map_endpoint_to_tcp (endpoint_a));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (error || !handshake.response || !handshake.query)
|
||||
{
|
||||
node_l->logger.debug (nano::log::type::tcp, "Error reading handshake payload from: {} ({})", nano::util::to_str (endpoint_a), ec.message ());
|
||||
|
||||
cleanup_node_id_handshake_socket (endpoint_a);
|
||||
return;
|
||||
}
|
||||
channel_a->set_network_version (handshake.header.version_using);
|
||||
|
||||
debug_assert (handshake.query);
|
||||
debug_assert (handshake.response);
|
||||
|
||||
auto const node_id = handshake.response->node_id;
|
||||
|
||||
if (!node_l->network.verify_handshake_response (*handshake.response, endpoint_a))
|
||||
{
|
||||
cleanup_node_id_handshake_socket (endpoint_a);
|
||||
return;
|
||||
}
|
||||
|
||||
/* If node ID is known, don't establish new connection
|
||||
Exception: temporary channels from tcp_server */
|
||||
auto existing_channel (node_l->network.tcp_channels.find_node_id (node_id));
|
||||
if (existing_channel && !existing_channel->temporary)
|
||||
{
|
||||
cleanup_node_id_handshake_socket (endpoint_a);
|
||||
return;
|
||||
}
|
||||
|
||||
channel_a->set_node_id (node_id);
|
||||
channel_a->set_last_packet_received (std::chrono::steady_clock::now ());
|
||||
|
||||
debug_assert (handshake.query);
|
||||
auto response = node_l->network.prepare_handshake_response (*handshake.query, handshake.is_v2 ());
|
||||
nano::node_id_handshake handshake_response (node_l->network_params.network, std::nullopt, response);
|
||||
|
||||
node_l->logger.debug (nano::log::type::tcp, "Handshake response sent to {} (query: {})",
|
||||
nano::util::to_str (endpoint_a),
|
||||
handshake.query->cookie.to_string ());
|
||||
|
||||
channel_a->send (handshake_response, [node_w, channel_a, endpoint_a, cleanup_node_id_handshake_socket] (boost::system::error_code const & ec, std::size_t size_a) {
|
||||
auto node_l = node_w.lock ();
|
||||
if (!node_l)
|
||||
{
|
||||
return;
|
||||
}
|
||||
if (ec || !channel_a)
|
||||
{
|
||||
node_l->logger.debug (nano::log::type::tcp, "Error sending handshake response to: {} ({})", nano::util::to_str (endpoint_a), ec.message ());
|
||||
|
||||
cleanup_node_id_handshake_socket (endpoint_a);
|
||||
return;
|
||||
}
|
||||
// Insert new node ID connection
|
||||
auto socket_l = channel_a->socket.lock ();
|
||||
if (!socket_l)
|
||||
{
|
||||
return;
|
||||
}
|
||||
channel_a->set_last_packet_sent (std::chrono::steady_clock::now ());
|
||||
auto response_server = std::make_shared<nano::transport::tcp_server> (socket_l, node_l);
|
||||
node_l->network.tcp_channels.insert (channel_a, socket_l, response_server);
|
||||
// Listen for possible responses
|
||||
response_server->socket->type_set (nano::transport::socket_type::realtime_response_server);
|
||||
response_server->remote_node_id = channel_a->get_node_id ();
|
||||
response_server->start ();
|
||||
});
|
||||
});
|
||||
node.tcp_listener.connect (endpoint.address (), endpoint.port ());
|
||||
}
|
||||
|
|
|
@ -20,40 +20,35 @@ namespace mi = boost::multi_index;
|
|||
|
||||
namespace nano
|
||||
{
|
||||
class tcp_message_item final
|
||||
{
|
||||
public:
|
||||
std::shared_ptr<nano::message> message;
|
||||
nano::tcp_endpoint endpoint;
|
||||
nano::account node_id;
|
||||
std::shared_ptr<nano::transport::socket> socket;
|
||||
};
|
||||
|
||||
class tcp_message_manager final
|
||||
{
|
||||
public:
|
||||
tcp_message_manager (unsigned incoming_connections_max_a);
|
||||
void put_message (nano::tcp_message_item const & item_a);
|
||||
nano::tcp_message_item get_message ();
|
||||
// Stop container and notify waiting threads
|
||||
void stop ();
|
||||
|
||||
private:
|
||||
nano::mutex mutex;
|
||||
nano::condition_variable producer_condition;
|
||||
nano::condition_variable consumer_condition;
|
||||
std::deque<nano::tcp_message_item> entries;
|
||||
unsigned max_entries;
|
||||
static unsigned const max_entries_per_connection = 16;
|
||||
bool stopped{ false };
|
||||
|
||||
friend class network_tcp_message_manager_Test;
|
||||
};
|
||||
|
||||
namespace transport
|
||||
{
|
||||
class tcp_server;
|
||||
class tcp_channels;
|
||||
class channel_tcp;
|
||||
|
||||
// TODO: Replace with message_processor component with fair queueing
|
||||
class tcp_message_manager final
|
||||
{
|
||||
public:
|
||||
using entry_t = std::pair<std::unique_ptr<nano::message>, std::shared_ptr<nano::transport::channel_tcp>>;
|
||||
|
||||
explicit tcp_message_manager (unsigned incoming_connections_max);
|
||||
void stop ();
|
||||
|
||||
void put (std::unique_ptr<nano::message>, std::shared_ptr<nano::transport::channel_tcp>);
|
||||
entry_t next ();
|
||||
|
||||
private:
|
||||
nano::mutex mutex;
|
||||
nano::condition_variable producer_condition;
|
||||
nano::condition_variable consumer_condition;
|
||||
std::deque<entry_t> entries;
|
||||
unsigned max_entries;
|
||||
static unsigned const max_entries_per_connection = 16;
|
||||
bool stopped{ false };
|
||||
|
||||
friend class network_tcp_message_manager_Test;
|
||||
};
|
||||
|
||||
class channel_tcp : public nano::transport::channel, public std::enable_shared_from_this<channel_tcp>
|
||||
{
|
||||
|
@ -63,21 +58,13 @@ namespace transport
|
|||
channel_tcp (nano::node &, std::weak_ptr<nano::transport::socket>);
|
||||
~channel_tcp () override;
|
||||
|
||||
void update_endpoints ();
|
||||
|
||||
// TODO: investigate clang-tidy warning about default parameters on virtual/override functions//
|
||||
void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) override;
|
||||
|
||||
std::string to_string () const override;
|
||||
|
||||
void update_endpoint ()
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lk (channel_mutex);
|
||||
debug_assert (endpoint == nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0)); // Not initialized endpoint value
|
||||
if (auto socket_l = socket.lock ())
|
||||
{
|
||||
endpoint = socket_l->remote_endpoint ();
|
||||
}
|
||||
}
|
||||
|
||||
nano::endpoint get_endpoint () const override
|
||||
{
|
||||
return nano::transport::map_tcp_to_endpoint (get_tcp_endpoint ());
|
||||
|
@ -89,6 +76,12 @@ namespace transport
|
|||
return endpoint;
|
||||
}
|
||||
|
||||
nano::endpoint get_local_endpoint () const override
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lk (channel_mutex);
|
||||
return local_endpoint;
|
||||
}
|
||||
|
||||
nano::transport::transport_type get_type () const override
|
||||
{
|
||||
return nano::transport::transport_type::tcp;
|
||||
|
@ -124,12 +117,9 @@ namespace transport
|
|||
public:
|
||||
std::weak_ptr<nano::transport::socket> socket;
|
||||
|
||||
/* Mark for temporary channels. Usually remote ports of these channels are ephemeral and received from incoming connections to server.
|
||||
If remote part has open listening port, temporary channel will be replaced with direct connection to listening port soon. But if other side is behing NAT or firewall this connection can be pemanent. */
|
||||
std::atomic<bool> temporary{ false };
|
||||
|
||||
private:
|
||||
nano::tcp_endpoint endpoint{ boost::asio::ip::address_v6::any (), 0 };
|
||||
nano::endpoint endpoint;
|
||||
nano::endpoint local_endpoint;
|
||||
|
||||
public: // Logging
|
||||
void operator() (nano::object_stream &) const override;
|
||||
|
@ -137,7 +127,7 @@ namespace transport
|
|||
|
||||
class tcp_channels final
|
||||
{
|
||||
friend class nano::transport::channel_tcp;
|
||||
friend class channel_tcp;
|
||||
friend class telemetry_simultaneous_requests_Test;
|
||||
friend class network_peer_max_tcp_attempts_subnetwork_Test;
|
||||
|
||||
|
@ -148,7 +138,7 @@ namespace transport
|
|||
void start ();
|
||||
void stop ();
|
||||
|
||||
bool insert (std::shared_ptr<nano::transport::channel_tcp> const &, std::shared_ptr<nano::transport::socket> const &, std::shared_ptr<nano::transport::tcp_server> const &);
|
||||
std::shared_ptr<nano::transport::channel_tcp> create (std::shared_ptr<nano::transport::socket> const &, std::shared_ptr<nano::transport::tcp_server> const &, nano::account const & node_id);
|
||||
void erase (nano::tcp_endpoint const &);
|
||||
std::size_t size () const;
|
||||
std::shared_ptr<nano::transport::channel_tcp> find_channel (nano::tcp_endpoint const &) const;
|
||||
|
@ -157,8 +147,8 @@ namespace transport
|
|||
std::shared_ptr<nano::transport::channel_tcp> find_node_id (nano::account const &);
|
||||
// Get the next peer for attempting a tcp connection
|
||||
nano::tcp_endpoint bootstrap_peer ();
|
||||
void queue_message (std::unique_ptr<nano::message>, std::shared_ptr<nano::transport::channel_tcp>);
|
||||
void process_messages ();
|
||||
void process_message (nano::message const &, nano::tcp_endpoint const &, nano::account const &, std::shared_ptr<nano::transport::socket> const &);
|
||||
bool max_ip_connections (nano::tcp_endpoint const & endpoint_a);
|
||||
bool max_subnetwork_connections (nano::tcp_endpoint const & endpoint_a);
|
||||
bool max_ip_or_subnetwork_connections (nano::tcp_endpoint const & endpoint_a);
|
||||
|
@ -173,16 +163,16 @@ namespace transport
|
|||
|
||||
// Connection start
|
||||
void start_tcp (nano::endpoint const &);
|
||||
void start_tcp_receive_node_id (std::shared_ptr<nano::transport::channel_tcp> const &, nano::endpoint const &, std::shared_ptr<std::vector<uint8_t>> const &);
|
||||
|
||||
private: // Dependencies
|
||||
nano::node & node;
|
||||
|
||||
public:
|
||||
nano::tcp_message_manager message_manager;
|
||||
tcp_message_manager message_manager;
|
||||
|
||||
private:
|
||||
void close ();
|
||||
bool check (nano::tcp_endpoint const &, nano::account const & node_id) const;
|
||||
|
||||
private:
|
||||
class channel_entry final
|
||||
|
@ -215,8 +205,7 @@ namespace transport
|
|||
}
|
||||
nano::account node_id () const
|
||||
{
|
||||
auto node_id (channel->get_node_id ());
|
||||
return node_id;
|
||||
return channel->get_node_id ();
|
||||
}
|
||||
uint8_t network_version () const
|
||||
{
|
||||
|
|
|
@ -8,6 +8,9 @@
|
|||
#include <boost/asio/use_future.hpp>
|
||||
|
||||
#include <memory>
|
||||
#include <ranges>
|
||||
|
||||
#include <magic_enum.hpp>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
|
@ -15,26 +18,27 @@ using namespace std::chrono_literals;
|
|||
* tcp_listener
|
||||
*/
|
||||
|
||||
nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_a, std::size_t max_inbound_connections) :
|
||||
nano::transport::tcp_listener::tcp_listener (uint16_t port_a, tcp_config const & config_a, nano::node & node_a) :
|
||||
config{ config_a },
|
||||
node{ node_a },
|
||||
stats{ node_a.stats },
|
||||
logger{ node_a.logger },
|
||||
port{ port_a },
|
||||
max_inbound_connections{ max_inbound_connections },
|
||||
strand{ node_a.io_ctx.get_executor () },
|
||||
acceptor{ strand },
|
||||
task{ strand }
|
||||
{
|
||||
connection_accepted.add ([this] (auto const & socket, auto const & server) {
|
||||
node.observers.socket_accepted.notify (*socket);
|
||||
node.observers.socket_connected.notify (*socket);
|
||||
});
|
||||
}
|
||||
|
||||
nano::transport::tcp_listener::~tcp_listener ()
|
||||
{
|
||||
// Thread should be stopped before destruction
|
||||
debug_assert (!cleanup_thread.joinable ());
|
||||
debug_assert (!task.joinable ());
|
||||
debug_assert (connection_count () == 0);
|
||||
debug_assert (attempt_count () == 0);
|
||||
}
|
||||
|
||||
void nano::transport::tcp_listener::start ()
|
||||
|
@ -91,7 +95,8 @@ void nano::transport::tcp_listener::start ()
|
|||
{
|
||||
logger.critical (nano::log::type::tcp_listener, "Unknown error");
|
||||
release_assert (false); // Unexpected error
|
||||
} });
|
||||
}
|
||||
});
|
||||
|
||||
cleanup_thread = std::thread ([this] {
|
||||
nano::thread_role::set (nano::thread_role::name::tcp_listener);
|
||||
|
@ -130,9 +135,18 @@ void nano::transport::tcp_listener::stop ()
|
|||
}
|
||||
|
||||
decltype (connections) connections_l;
|
||||
decltype (attempts) attempts_l;
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
connections_l.swap (connections);
|
||||
attempts_l.swap (attempts);
|
||||
}
|
||||
|
||||
for (auto & attempt : attempts_l)
|
||||
{
|
||||
debug_assert (attempt.task.joinable ());
|
||||
attempt.task.cancel ();
|
||||
attempt.task.join ();
|
||||
}
|
||||
|
||||
for (auto & connection : connections_l)
|
||||
|
@ -154,7 +168,10 @@ void nano::transport::tcp_listener::run_cleanup ()
|
|||
while (!stopped)
|
||||
{
|
||||
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::cleanup);
|
||||
|
||||
cleanup ();
|
||||
timeout ();
|
||||
|
||||
condition.wait_for (lock, 1s, [this] () { return stopped.load (); });
|
||||
}
|
||||
}
|
||||
|
@ -163,6 +180,7 @@ void nano::transport::tcp_listener::cleanup ()
|
|||
{
|
||||
debug_assert (!mutex.try_lock ());
|
||||
|
||||
// Erase dead connections
|
||||
erase_if (connections, [this] (auto const & connection) {
|
||||
if (connection.socket.expired () && connection.server.expired ())
|
||||
{
|
||||
|
@ -175,6 +193,109 @@ void nano::transport::tcp_listener::cleanup ()
|
|||
return false;
|
||||
}
|
||||
});
|
||||
|
||||
// Erase completed attempts
|
||||
erase_if (attempts, [this] (auto const & attempt) {
|
||||
return attempt.task.ready ();
|
||||
});
|
||||
}
|
||||
|
||||
void nano::transport::tcp_listener::timeout ()
|
||||
{
|
||||
debug_assert (!mutex.try_lock ());
|
||||
|
||||
auto const cutoff = std::chrono::steady_clock::now () - config.connect_timeout;
|
||||
|
||||
// Cancel timed out attempts
|
||||
for (auto & attempt : attempts)
|
||||
{
|
||||
if (!attempt.task.ready () && attempt.start < cutoff)
|
||||
{
|
||||
attempt.task.cancel ();
|
||||
|
||||
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::attempt_timeout);
|
||||
logger.debug (nano::log::type::tcp_listener, "Connection attempt timed out: {} (started {}s ago)",
|
||||
fmt::streamed (attempt.endpoint), nano::log::seconds_delta (attempt.start));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool nano::transport::tcp_listener::connect (asio::ip::address ip, uint16_t port)
|
||||
{
|
||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||
|
||||
if (port == 0)
|
||||
{
|
||||
port = node.network_params.network.default_node_port;
|
||||
}
|
||||
|
||||
if (auto count = attempts.size (); count > config.max_attempts)
|
||||
{
|
||||
stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_attempts, nano::stat::dir::out);
|
||||
logger.debug (nano::log::type::tcp_listener, "Max connection attempts reached ({}), rejected connection attempt: {}",
|
||||
count, ip.to_string ());
|
||||
|
||||
return false; // Rejected
|
||||
}
|
||||
|
||||
if (auto count = count_attempts (ip); count >= config.max_attempts_per_ip)
|
||||
{
|
||||
stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_attempts_per_ip, nano::stat::dir::out);
|
||||
logger.debug (nano::log::type::tcp_listener, "Connection attempt already in progress ({}), rejected connection attempt: {}",
|
||||
count, ip.to_string ());
|
||||
|
||||
return false; // Rejected
|
||||
}
|
||||
|
||||
if (auto result = check_limits (ip, connection_type::outbound); result != accept_result::accepted)
|
||||
{
|
||||
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_rejected, nano::stat::dir::out);
|
||||
// Refusal reason should be logged earlier
|
||||
|
||||
return false; // Rejected
|
||||
}
|
||||
|
||||
nano::tcp_endpoint const endpoint{ ip, port };
|
||||
|
||||
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_initiate, nano::stat::dir::out);
|
||||
logger.debug (nano::log::type::tcp_listener, "Initiating outgoing connection to: {}", fmt::streamed (endpoint));
|
||||
|
||||
auto task = nano::async::task (strand, connect_impl (endpoint));
|
||||
|
||||
attempts.emplace_back (attempt{ endpoint, std::move (task) });
|
||||
|
||||
return true; // Attempt started
|
||||
}
|
||||
|
||||
auto nano::transport::tcp_listener::connect_impl (asio::ip::tcp::endpoint endpoint) -> asio::awaitable<void>
|
||||
{
|
||||
debug_assert (strand.running_in_this_thread ());
|
||||
|
||||
try
|
||||
{
|
||||
auto raw_socket = co_await connect_socket (endpoint);
|
||||
debug_assert (strand.running_in_this_thread ());
|
||||
|
||||
auto result = accept_one (std::move (raw_socket), connection_type::outbound);
|
||||
if (result.result == accept_result::accepted)
|
||||
{
|
||||
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_success, nano::stat::dir::out);
|
||||
logger.debug (nano::log::type::tcp_listener, "Successfully connected to: {}", fmt::streamed (endpoint));
|
||||
|
||||
release_assert (result.server);
|
||||
result.server->initiate_handshake ();
|
||||
}
|
||||
else
|
||||
{
|
||||
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_failure, nano::stat::dir::out);
|
||||
// Refusal reason should be logged earlier
|
||||
}
|
||||
}
|
||||
catch (boost::system::system_error const & ex)
|
||||
{
|
||||
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_error, nano::stat::dir::out);
|
||||
logger.log (nano::log::level::debug, nano::log::type::tcp_listener, "Error connecting to: {} ({})", fmt::streamed (endpoint), ex.what ());
|
||||
}
|
||||
}
|
||||
|
||||
asio::awaitable<void> nano::transport::tcp_listener::run ()
|
||||
|
@ -188,8 +309,10 @@ asio::awaitable<void> nano::transport::tcp_listener::run ()
|
|||
try
|
||||
{
|
||||
auto socket = co_await accept_socket ();
|
||||
auto result = accept_one (std::move (socket));
|
||||
if (result != accept_result::accepted)
|
||||
debug_assert (strand.running_in_this_thread ());
|
||||
|
||||
auto result = accept_one (std::move (socket), connection_type::inbound);
|
||||
if (result.result != accept_result::accepted)
|
||||
{
|
||||
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_failure, nano::stat::dir::in);
|
||||
// Refusal reason should be logged earlier
|
||||
|
@ -218,15 +341,43 @@ asio::awaitable<asio::ip::tcp::socket> nano::transport::tcp_listener::accept_soc
|
|||
co_return co_await acceptor.async_accept (asio::use_awaitable);
|
||||
}
|
||||
|
||||
auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket) -> accept_result
|
||||
asio::awaitable<asio::ip::tcp::socket> nano::transport::tcp_listener::connect_socket (asio::ip::tcp::endpoint endpoint)
|
||||
{
|
||||
debug_assert (strand.running_in_this_thread ());
|
||||
|
||||
asio::ip::tcp::socket raw_socket{ strand };
|
||||
co_await raw_socket.async_connect (endpoint, asio::use_awaitable);
|
||||
|
||||
co_return raw_socket;
|
||||
}
|
||||
|
||||
asio::awaitable<void> nano::transport::tcp_listener::wait_available_slots () const
|
||||
{
|
||||
nano::interval log_interval;
|
||||
while (connection_count () >= config.max_inbound_connections && !stopped)
|
||||
{
|
||||
if (log_interval.elapsed (node.network_params.network.is_dev_network () ? 1s : 15s))
|
||||
{
|
||||
logger.warn (nano::log::type::tcp_listener, "Waiting for available slots to accept new connections (current: {} / max: {})",
|
||||
connection_count (), config.max_inbound_connections);
|
||||
}
|
||||
|
||||
co_await nano::async::sleep_for (100ms);
|
||||
}
|
||||
}
|
||||
|
||||
auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket, connection_type type) -> accept_return
|
||||
{
|
||||
auto const remote_endpoint = raw_socket.remote_endpoint ();
|
||||
auto const local_endpoint = raw_socket.local_endpoint ();
|
||||
|
||||
if (auto result = check_limits (remote_endpoint.address ()); result != accept_result::accepted)
|
||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||
|
||||
if (auto result = check_limits (remote_endpoint.address (), type); result != accept_result::accepted)
|
||||
{
|
||||
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_limits_exceeded, nano::stat::dir::in);
|
||||
// Refusal reason should be logged earlier
|
||||
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_rejected, to_stat_dir (type));
|
||||
logger.debug (nano::log::type::tcp_listener, "Rejected connection from: {} ({})", fmt::streamed (remote_endpoint), to_string (type));
|
||||
// Rejection reason should be logged earlier
|
||||
|
||||
try
|
||||
{
|
||||
|
@ -236,23 +387,22 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket
|
|||
}
|
||||
catch (boost::system::system_error const & ex)
|
||||
{
|
||||
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::close_error, nano::stat::dir::in);
|
||||
logger.debug (nano::log::type::tcp_listener, "Error while closing socket after refusing connection: {}", ex.what ());
|
||||
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::close_error, to_stat_dir (type));
|
||||
logger.debug (nano::log::type::tcp_listener, "Error while closing socket after refusing connection: {} ({})", ex.what (), to_string (type));
|
||||
}
|
||||
|
||||
return result;
|
||||
return { result };
|
||||
}
|
||||
|
||||
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, nano::stat::dir::in);
|
||||
logger.debug (nano::log::type::tcp_listener, "Accepted incoming connection from: {}", fmt::streamed (remote_endpoint));
|
||||
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, to_stat_dir (type));
|
||||
logger.debug (nano::log::type::tcp_listener, "Accepted connection: {} ({})", fmt::streamed (remote_endpoint), to_string (type));
|
||||
|
||||
auto socket = std::make_shared<nano::transport::socket> (node, std::move (raw_socket), remote_endpoint, local_endpoint, socket_endpoint::server);
|
||||
auto socket = std::make_shared<nano::transport::socket> (node, std::move (raw_socket), remote_endpoint, local_endpoint, to_socket_endpoint (type));
|
||||
auto server = std::make_shared<nano::transport::tcp_server> (socket, node.shared (), true);
|
||||
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
connections.emplace (entry{ remote_endpoint, socket, server });
|
||||
}
|
||||
connections.emplace_back (connection{ remote_endpoint, socket, server });
|
||||
|
||||
lock.unlock ();
|
||||
|
||||
socket->set_timeout (node.network_params.network.idle_timeout);
|
||||
socket->start ();
|
||||
|
@ -260,49 +410,37 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket
|
|||
|
||||
connection_accepted.notify (socket, server);
|
||||
|
||||
return accept_result::accepted;
|
||||
return { accept_result::accepted, socket, server };
|
||||
}
|
||||
|
||||
asio::awaitable<void> nano::transport::tcp_listener::wait_available_slots () const
|
||||
auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip, connection_type type) -> accept_result
|
||||
{
|
||||
nano::interval log_interval;
|
||||
while (connection_count () >= max_inbound_connections && !stopped)
|
||||
debug_assert (!mutex.try_lock ());
|
||||
|
||||
if (stopped)
|
||||
{
|
||||
if (log_interval.elapsed (node.network_params.network.is_dev_network () ? 1s : 15s))
|
||||
{
|
||||
logger.warn (nano::log::type::tcp_listener, "Waiting for available slots to accept new connections (current: {} / max: {})",
|
||||
connection_count (), max_inbound_connections);
|
||||
}
|
||||
|
||||
co_await nano::async::sleep_for (100ms);
|
||||
return accept_result::rejected;
|
||||
}
|
||||
}
|
||||
|
||||
auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip) -> accept_result
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
|
||||
cleanup ();
|
||||
|
||||
debug_assert (connections.size () <= max_inbound_connections); // Should be checked earlier (wait_available_slots)
|
||||
|
||||
if (node.network.excluded_peers.check (ip)) // true => error
|
||||
{
|
||||
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::excluded, nano::stat::dir::in);
|
||||
stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::excluded, to_stat_dir (type));
|
||||
logger.debug (nano::log::type::tcp_listener, "Rejected connection from excluded peer: {}", ip.to_string ());
|
||||
|
||||
return accept_result::excluded;
|
||||
return accept_result::rejected;
|
||||
}
|
||||
|
||||
if (!node.flags.disable_max_peers_per_ip)
|
||||
{
|
||||
if (auto count = count_per_ip (ip); count >= node.network_params.network.max_peers_per_ip)
|
||||
{
|
||||
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::max_per_ip, nano::stat::dir::in);
|
||||
logger.debug (nano::log::type::tcp_listener, "Max connections per IP reached (ip: {}, count: {}), unable to open new connection",
|
||||
ip.to_string (), count);
|
||||
stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_per_ip, to_stat_dir (type));
|
||||
logger.debug (nano::log::type::tcp_listener, "Max connections per IP reached ({}), unable to open new connection: {}",
|
||||
count, ip.to_string ());
|
||||
|
||||
return accept_result::too_many_per_ip;
|
||||
return accept_result::rejected;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -311,11 +449,36 @@ auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip)
|
|||
{
|
||||
if (auto count = count_per_subnetwork (ip); count >= node.network_params.network.max_peers_per_subnetwork)
|
||||
{
|
||||
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::max_per_subnetwork, nano::stat::dir::in);
|
||||
logger.debug (nano::log::type::tcp_listener, "Max connections per subnetwork reached (ip: {}, count: {}), unable to open new connection",
|
||||
ip.to_string (), count);
|
||||
stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_per_subnetwork, to_stat_dir (type));
|
||||
logger.debug (nano::log::type::tcp_listener, "Max connections per subnetwork reached ({}), unable to open new connection: {}",
|
||||
count, ip.to_string ());
|
||||
|
||||
return accept_result::too_many_per_subnetwork;
|
||||
return accept_result::rejected;
|
||||
}
|
||||
}
|
||||
|
||||
if (type == connection_type::inbound)
|
||||
{
|
||||
debug_assert (connections.size () <= config.max_inbound_connections); // Should be checked earlier (wait_available_slots)
|
||||
|
||||
if (auto count = count_per_type (connection_type::inbound); count >= config.max_inbound_connections)
|
||||
{
|
||||
stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_attempts, to_stat_dir (type));
|
||||
logger.debug (nano::log::type::tcp_listener, "Max inbound connections reached ({}), unable to accept new connection: {}",
|
||||
count, ip.to_string ());
|
||||
|
||||
return accept_result::rejected;
|
||||
}
|
||||
}
|
||||
if (type == connection_type::outbound)
|
||||
{
|
||||
if (auto count = count_per_type (connection_type::outbound); count >= config.max_outbound_connections)
|
||||
{
|
||||
stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_attempts, to_stat_dir (type));
|
||||
logger.debug (nano::log::type::tcp_listener, "Max outbound connections reached ({}), unable to initiate new connection: {}",
|
||||
count, ip.to_string ());
|
||||
|
||||
return accept_result::rejected;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -328,6 +491,12 @@ size_t nano::transport::tcp_listener::connection_count () const
|
|||
return connections.size ();
|
||||
}
|
||||
|
||||
size_t nano::transport::tcp_listener::attempt_count () const
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
return attempts.size ();
|
||||
}
|
||||
|
||||
size_t nano::transport::tcp_listener::realtime_count () const
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
|
@ -354,6 +523,19 @@ size_t nano::transport::tcp_listener::bootstrap_count () const
|
|||
});
|
||||
}
|
||||
|
||||
size_t nano::transport::tcp_listener::count_per_type (connection_type type) const
|
||||
{
|
||||
debug_assert (!mutex.try_lock ());
|
||||
|
||||
return std::count_if (connections.begin (), connections.end (), [type] (auto const & connection) {
|
||||
if (auto socket = connection.socket.lock ())
|
||||
{
|
||||
return socket->endpoint_type () == to_socket_endpoint (type);
|
||||
}
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
size_t nano::transport::tcp_listener::count_per_ip (asio::ip::address const & ip) const
|
||||
{
|
||||
debug_assert (!mutex.try_lock ());
|
||||
|
@ -372,15 +554,74 @@ size_t nano::transport::tcp_listener::count_per_subnetwork (asio::ip::address co
|
|||
});
|
||||
}
|
||||
|
||||
size_t nano::transport::tcp_listener::count_attempts (asio::ip::address const & ip) const
|
||||
{
|
||||
debug_assert (!mutex.try_lock ());
|
||||
|
||||
return std::count_if (attempts.begin (), attempts.end (), [&ip] (auto const & attempt) {
|
||||
return nano::transport::is_same_ip (attempt.address (), ip);
|
||||
});
|
||||
}
|
||||
|
||||
asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
return { asio::ip::address_v6::loopback (), local.port () };
|
||||
}
|
||||
|
||||
auto nano::transport::tcp_listener::sockets () const -> std::vector<std::shared_ptr<socket>>
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
auto r = connections
|
||||
| std::views::transform ([] (auto const & connection) { return connection.socket.lock (); })
|
||||
| std::views::filter ([] (auto const & socket) { return socket != nullptr; });
|
||||
return { r.begin (), r.end () };
|
||||
}
|
||||
|
||||
auto nano::transport::tcp_listener::servers () const -> std::vector<std::shared_ptr<tcp_server>>
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
auto r = connections
|
||||
| std::views::transform ([] (auto const & connection) { return connection.server.lock (); })
|
||||
| std::views::filter ([] (auto const & server) { return server != nullptr; });
|
||||
return { r.begin (), r.end () };
|
||||
}
|
||||
|
||||
std::unique_ptr<nano::container_info_component> nano::transport::tcp_listener::collect_container_info (std::string const & name)
|
||||
{
|
||||
auto composite = std::make_unique<container_info_composite> (name);
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "connections", connection_count (), sizeof (decltype (connections)::value_type) }));
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "attempts", attempt_count (), sizeof (decltype (attempts)::value_type) }));
|
||||
return composite;
|
||||
}
|
||||
}
|
||||
|
||||
nano::stat::dir nano::transport::tcp_listener::to_stat_dir (connection_type type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case connection_type::inbound:
|
||||
return nano::stat::dir::in;
|
||||
case connection_type::outbound:
|
||||
return nano::stat::dir::out;
|
||||
}
|
||||
debug_assert (false);
|
||||
return {};
|
||||
}
|
||||
|
||||
std::string_view nano::transport::tcp_listener::to_string (connection_type type)
|
||||
{
|
||||
return magic_enum::enum_name (type);
|
||||
}
|
||||
|
||||
nano::transport::socket_endpoint nano::transport::tcp_listener::to_socket_endpoint (connection_type type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case connection_type::inbound:
|
||||
return socket_endpoint::server;
|
||||
case connection_type::outbound:
|
||||
return socket_endpoint::client;
|
||||
}
|
||||
debug_assert (false);
|
||||
return {};
|
||||
}
|
||||
|
|
|
@ -2,6 +2,8 @@
|
|||
|
||||
#include <nano/lib/async.hpp>
|
||||
#include <nano/node/common.hpp>
|
||||
#include <nano/node/fwd.hpp>
|
||||
#include <nano/node/transport/common.hpp>
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/multi_index/hashed_index.hpp>
|
||||
|
@ -9,42 +11,71 @@
|
|||
#include <boost/multi_index_container.hpp>
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <future>
|
||||
#include <list>
|
||||
#include <string_view>
|
||||
#include <thread>
|
||||
|
||||
namespace mi = boost::multi_index;
|
||||
namespace asio = boost::asio;
|
||||
|
||||
namespace nano
|
||||
{
|
||||
class node;
|
||||
class stats;
|
||||
class logger;
|
||||
}
|
||||
|
||||
namespace nano::transport
|
||||
{
|
||||
class socket;
|
||||
class tcp_server;
|
||||
|
||||
class tcp_config
|
||||
{
|
||||
public:
|
||||
explicit tcp_config (nano::network_constants const & network)
|
||||
{
|
||||
if (network.is_dev_network ())
|
||||
{
|
||||
max_inbound_connections = 128;
|
||||
max_outbound_connections = 128;
|
||||
max_attempts = 128;
|
||||
max_attempts_per_ip = 128;
|
||||
connect_timeout = std::chrono::seconds{ 5 };
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
size_t max_inbound_connections{ 2048 };
|
||||
size_t max_outbound_connections{ 2048 };
|
||||
size_t max_attempts{ 60 };
|
||||
size_t max_attempts_per_ip{ 1 };
|
||||
std::chrono::seconds connect_timeout{ 60 };
|
||||
};
|
||||
|
||||
/**
|
||||
* Server side portion of tcp sessions. Listens for new socket connections and spawns tcp_server objects when connected.
|
||||
*/
|
||||
class tcp_listener final
|
||||
{
|
||||
public:
|
||||
tcp_listener (uint16_t port, nano::node &, std::size_t max_inbound_connections);
|
||||
tcp_listener (uint16_t port, tcp_config const &, nano::node &);
|
||||
~tcp_listener ();
|
||||
|
||||
void start ();
|
||||
void stop ();
|
||||
|
||||
/**
|
||||
* @param port is optional, if 0 then default peering port is used
|
||||
* @return true if connection attempt was initiated
|
||||
*/
|
||||
bool connect (asio::ip::address ip, uint16_t port = 0);
|
||||
|
||||
nano::tcp_endpoint endpoint () const;
|
||||
|
||||
size_t connection_count () const;
|
||||
size_t attempt_count () const;
|
||||
size_t realtime_count () const;
|
||||
size_t bootstrap_count () const;
|
||||
|
||||
std::vector<std::shared_ptr<socket>> sockets () const;
|
||||
std::vector<std::shared_ptr<tcp_server>> servers () const;
|
||||
|
||||
std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name);
|
||||
|
||||
public: // Events
|
||||
|
@ -52,6 +83,7 @@ public: // Events
|
|||
connection_accepted_event_t connection_accepted;
|
||||
|
||||
private: // Dependencies
|
||||
tcp_config const & config;
|
||||
nano::node & node;
|
||||
nano::stats & stats;
|
||||
nano::logger & logger;
|
||||
|
@ -62,25 +94,43 @@ private:
|
|||
|
||||
void run_cleanup ();
|
||||
void cleanup ();
|
||||
void timeout ();
|
||||
|
||||
enum class accept_result
|
||||
{
|
||||
invalid,
|
||||
accepted,
|
||||
too_many_per_ip,
|
||||
too_many_per_subnetwork,
|
||||
excluded,
|
||||
rejected,
|
||||
error,
|
||||
};
|
||||
|
||||
accept_result accept_one (asio::ip::tcp::socket);
|
||||
accept_result check_limits (asio::ip::address const & ip);
|
||||
enum class connection_type
|
||||
{
|
||||
inbound,
|
||||
outbound,
|
||||
};
|
||||
|
||||
asio::awaitable<void> connect_impl (asio::ip::tcp::endpoint);
|
||||
asio::awaitable<asio::ip::tcp::socket> connect_socket (asio::ip::tcp::endpoint);
|
||||
|
||||
struct accept_return
|
||||
{
|
||||
accept_result result;
|
||||
std::shared_ptr<nano::transport::socket> socket;
|
||||
std::shared_ptr<nano::transport::tcp_server> server;
|
||||
};
|
||||
|
||||
accept_return accept_one (asio::ip::tcp::socket, connection_type);
|
||||
accept_result check_limits (asio::ip::address const & ip, connection_type);
|
||||
asio::awaitable<asio::ip::tcp::socket> accept_socket ();
|
||||
|
||||
size_t count_per_type (connection_type) const;
|
||||
size_t count_per_ip (asio::ip::address const & ip) const;
|
||||
size_t count_per_subnetwork (asio::ip::address const & ip) const;
|
||||
size_t count_attempts (asio::ip::address const & ip) const;
|
||||
|
||||
private:
|
||||
struct entry
|
||||
struct connection
|
||||
{
|
||||
asio::ip::tcp::endpoint endpoint;
|
||||
std::weak_ptr<nano::transport::socket> socket;
|
||||
|
@ -92,20 +142,24 @@ private:
|
|||
}
|
||||
};
|
||||
|
||||
struct attempt
|
||||
{
|
||||
asio::ip::tcp::endpoint endpoint;
|
||||
nano::async::task task;
|
||||
|
||||
std::chrono::steady_clock::time_point const start{ std::chrono::steady_clock::now () };
|
||||
|
||||
asio::ip::address address () const
|
||||
{
|
||||
return endpoint.address ();
|
||||
}
|
||||
};
|
||||
|
||||
private:
|
||||
uint16_t const port;
|
||||
std::size_t const max_inbound_connections;
|
||||
|
||||
// clang-format off
|
||||
class tag_address {};
|
||||
|
||||
using ordered_connections = boost::multi_index_container<entry,
|
||||
mi::indexed_by<
|
||||
mi::hashed_non_unique<mi::tag<tag_address>,
|
||||
mi::const_mem_fun<entry, asio::ip::address, &entry::address>>
|
||||
>>;
|
||||
// clang-format on
|
||||
ordered_connections connections;
|
||||
std::list<connection> connections;
|
||||
std::list<attempt> attempts;
|
||||
|
||||
nano::async::strand strand;
|
||||
|
||||
|
@ -117,5 +171,10 @@ private:
|
|||
mutable nano::mutex mutex;
|
||||
nano::async::task task;
|
||||
std::thread cleanup_thread;
|
||||
|
||||
private:
|
||||
static nano::stat::dir to_stat_dir (connection_type);
|
||||
static std::string_view to_string (connection_type);
|
||||
static nano::transport::socket_endpoint to_socket_endpoint (connection_type);
|
||||
};
|
||||
}
|
|
@ -38,17 +38,6 @@ nano::transport::tcp_server::~tcp_server ()
|
|||
|
||||
node->logger.debug (nano::log::type::tcp_server, "Exiting server: {}", fmt::streamed (remote_endpoint));
|
||||
|
||||
if (socket->type () == nano::transport::socket_type::realtime)
|
||||
{
|
||||
// Clear temporary channel
|
||||
auto exisiting_response_channel (node->network.tcp_channels.find_channel (remote_endpoint));
|
||||
if (exisiting_response_channel != nullptr)
|
||||
{
|
||||
exisiting_response_channel->temporary = false;
|
||||
node->network.tcp_channels.erase (remote_endpoint);
|
||||
}
|
||||
}
|
||||
|
||||
stop ();
|
||||
}
|
||||
|
||||
|
@ -260,7 +249,11 @@ void nano::transport::tcp_server::queue_realtime (std::unique_ptr<nano::message>
|
|||
{
|
||||
return;
|
||||
}
|
||||
node->network.tcp_channels.message_manager.put_message (nano::tcp_message_item{ std::move (message), remote_endpoint, remote_node_id, socket });
|
||||
|
||||
release_assert (channel != nullptr);
|
||||
|
||||
channel->set_last_packet_received (std::chrono::steady_clock::now ());
|
||||
node->network.tcp_channels.queue_message (std::move (message), channel);
|
||||
}
|
||||
|
||||
auto nano::transport::tcp_server::process_handshake (nano::node_id_handshake const & message) -> handshake_status
|
||||
|
@ -296,7 +289,9 @@ auto nano::transport::tcp_server::process_handshake (nano::node_id_handshake con
|
|||
handshake_received = true;
|
||||
|
||||
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake, nano::stat::dir::in);
|
||||
node->logger.debug (nano::log::type::tcp_server, "Handshake message received ({})", fmt::streamed (remote_endpoint));
|
||||
node->logger.debug (nano::log::type::tcp_server, "Handshake message received: {} ({})",
|
||||
message.query ? (message.response ? "query + response" : "query") : (message.response ? "response" : "none"),
|
||||
fmt::streamed (remote_endpoint));
|
||||
|
||||
if (message.query)
|
||||
{
|
||||
|
@ -333,6 +328,42 @@ auto nano::transport::tcp_server::process_handshake (nano::node_id_handshake con
|
|||
return handshake_status::handshake; // Handshake is in progress
|
||||
}
|
||||
|
||||
void nano::transport::tcp_server::initiate_handshake ()
|
||||
{
|
||||
auto node = this->node.lock ();
|
||||
if (!node)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
auto query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint));
|
||||
nano::node_id_handshake message{ node->network_params.network, query };
|
||||
|
||||
node->logger.debug (nano::log::type::tcp_server, "Initiating handshake query ({})", fmt::streamed (remote_endpoint));
|
||||
|
||||
auto shared_const_buffer = message.to_shared_const_buffer ();
|
||||
socket->async_write (shared_const_buffer, [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) {
|
||||
auto node = this_l->node.lock ();
|
||||
if (!node)
|
||||
{
|
||||
return;
|
||||
}
|
||||
if (ec)
|
||||
{
|
||||
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_network_error);
|
||||
node->logger.debug (nano::log::type::tcp_server, "Error sending handshake query: {} ({})", ec.message (), fmt::streamed (this_l->remote_endpoint));
|
||||
|
||||
// Stop invalid handshake
|
||||
this_l->stop ();
|
||||
}
|
||||
else
|
||||
{
|
||||
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake, nano::stat::dir::out);
|
||||
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_initiate, nano::stat::dir::out);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void nano::transport::tcp_server::send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2)
|
||||
{
|
||||
auto node = this->node.lock ();
|
||||
|
@ -634,7 +665,13 @@ bool nano::transport::tcp_server::to_realtime_connection (nano::account const &
|
|||
return false;
|
||||
}
|
||||
|
||||
remote_node_id = node_id;
|
||||
auto channel_l = node->network.tcp_channels.create (socket, shared_from_this (), node_id);
|
||||
if (!channel_l)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
channel = channel_l;
|
||||
|
||||
socket->type_set (nano::transport::socket_type::realtime);
|
||||
|
||||
node->logger.debug (nano::log::type::tcp_server, "Switched to realtime mode ({})", fmt::streamed (remote_endpoint));
|
||||
|
|
|
@ -25,6 +25,7 @@ public:
|
|||
void start ();
|
||||
void stop ();
|
||||
|
||||
void initiate_handshake ();
|
||||
void timeout ();
|
||||
void set_last_keepalive (nano::keepalive const & message);
|
||||
std::optional<nano::keepalive> pop_last_keepalive ();
|
||||
|
@ -36,7 +37,6 @@ public:
|
|||
std::atomic<bool> handshake_received{ false };
|
||||
// Remote enpoint used to remove response channel even after socket closing
|
||||
nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 };
|
||||
nano::account remote_node_id{};
|
||||
std::chrono::steady_clock::time_point last_telemetry_req{};
|
||||
|
||||
private:
|
||||
|
@ -74,6 +74,9 @@ private:
|
|||
std::shared_ptr<nano::transport::message_deserializer> message_deserializer;
|
||||
std::optional<nano::keepalive> last_keepalive;
|
||||
|
||||
// Every realtime connection must have an associated channel
|
||||
std::shared_ptr<nano::transport::channel_tcp> channel;
|
||||
|
||||
private: // Visitors
|
||||
class handshake_message_visitor : public nano::message_visitor
|
||||
{
|
||||
|
|
|
@ -1710,21 +1710,49 @@ TEST (rpc, keepalive)
|
|||
TEST (rpc, peers)
|
||||
{
|
||||
nano::test::system system;
|
||||
auto node = add_ipc_enabled_node (system);
|
||||
// Add node2 first to avoid peers with ephemeral ports
|
||||
auto const node2 = system.add_node ();
|
||||
auto node = add_ipc_enabled_node (system);
|
||||
auto const rpc_ctx = add_rpc (system, node);
|
||||
boost::property_tree::ptree request;
|
||||
request.put ("action", "peers");
|
||||
auto response (wait_response (system, rpc_ctx, request));
|
||||
auto & peers_node (response.get_child ("peers"));
|
||||
ASSERT_EQ (1, peers_node.size ());
|
||||
ASSERT_EQ (std::to_string (node->network_params.network.protocol_version), peers_node.get<std::string> ((boost::format ("[::1]:%1%") % node2->network.endpoint ().port ()).str ()));
|
||||
|
||||
auto peer = peers_node.begin ();
|
||||
ASSERT_EQ (peer->first, boost::lexical_cast<std::string> (node2->network.endpoint ()));
|
||||
ASSERT_EQ (std::to_string (node->network_params.network.protocol_version), peers_node.get<std::string> (peer->first));
|
||||
// The previous version of this test had an UDP connection to an arbitrary IP address, so it could check for two peers. This doesn't work with TCP.
|
||||
}
|
||||
|
||||
TEST (rpc, peers_node_id)
|
||||
{
|
||||
nano::test::system system;
|
||||
// Add node2 first to avoid peers with ephemeral ports
|
||||
auto const node2 = system.add_node ();
|
||||
auto node = add_ipc_enabled_node (system);
|
||||
auto const rpc_ctx = add_rpc (system, node);
|
||||
boost::property_tree::ptree request;
|
||||
request.put ("action", "peers");
|
||||
request.put ("peer_details", true);
|
||||
auto response (wait_response (system, rpc_ctx, request));
|
||||
auto & peers_node (response.get_child ("peers"));
|
||||
ASSERT_EQ (1, peers_node.size ());
|
||||
|
||||
auto peer = peers_node.begin ();
|
||||
ASSERT_EQ (peer->first, boost::lexical_cast<std::string> (node2->network.endpoint ()));
|
||||
|
||||
auto tree1 = peer->second;
|
||||
ASSERT_EQ (std::to_string (node->network_params.network.protocol_version), tree1.get<std::string> ("protocol_version"));
|
||||
ASSERT_EQ (node2->node_id.pub.to_node_id (), tree1.get<std::string> ("node_id"));
|
||||
// The previous version of this test had an UDP connection to an arbitrary IP address, so it could check for two peers. This doesn't work with TCP.
|
||||
}
|
||||
|
||||
TEST (rpc, peers_peering_endpoint)
|
||||
{
|
||||
nano::test::system system;
|
||||
// Add node first, so that node2 will connect to node from ephemeral port
|
||||
auto node = add_ipc_enabled_node (system);
|
||||
auto const node2 = system.add_node ();
|
||||
auto const rpc_ctx = add_rpc (system, node);
|
||||
|
@ -1734,10 +1762,10 @@ TEST (rpc, peers_node_id)
|
|||
auto response (wait_response (system, rpc_ctx, request));
|
||||
auto & peers_node (response.get_child ("peers"));
|
||||
ASSERT_EQ (1, peers_node.size ());
|
||||
auto tree1 (peers_node.get_child ((boost::format ("[::1]:%1%") % node2->network.endpoint ().port ()).str ()));
|
||||
ASSERT_EQ (std::to_string (node->network_params.network.protocol_version), tree1.get<std::string> ("protocol_version"));
|
||||
ASSERT_EQ (system.nodes[1]->node_id.pub.to_node_id (), tree1.get<std::string> ("node_id"));
|
||||
// The previous version of this test had an UDP connection to an arbitrary IP address, so it could check for two peers. This doesn't work with TCP.
|
||||
|
||||
auto peer = peers_node.begin ();
|
||||
ASSERT_NE (peer->first, boost::lexical_cast<std::string> (node2->network.endpoint ()));
|
||||
ASSERT_EQ (peer->second.get<std::string> ("peering"), boost::lexical_cast<std::string> (node2->network.endpoint ()));
|
||||
}
|
||||
|
||||
TEST (rpc, version)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue