Remove preferred TPC response channels (#2254)
As not required with bidirectional TCP & node_id search
This commit is contained in:
parent
100e4de6ab
commit
afa28a926f
9 changed files with 6 additions and 132 deletions
|
@ -130,11 +130,6 @@ TEST (network, send_node_id_handshake_tcp)
|
|||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
system.deadline_set (5s);
|
||||
while (system.nodes[0]->network.response_channels.size () != 1 || node1->network.response_channels.size () != 1)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
system.deadline_set (5s);
|
||||
while (system.nodes[0]->stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in) < initial_keepalive + 2)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
|
|
|
@ -2920,7 +2920,7 @@ TEST (node, peers)
|
|||
}
|
||||
// Wait to finish TCP node ID handshakes
|
||||
system.deadline_set (10s);
|
||||
while (system.nodes.back ()->network.response_channels.size () == 0 || system.nodes.front ()->network.response_channels.size () == 0)
|
||||
while (system.nodes[0]->bootstrap.realtime_count == 0 || system.nodes[1]->bootstrap.realtime_count == 0)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
|
|
|
@ -93,7 +93,6 @@ nano::bootstrap_server::~bootstrap_server ()
|
|||
else if (type == nano::bootstrap_server_type::realtime)
|
||||
{
|
||||
--node->bootstrap.realtime_count;
|
||||
node->network.response_channels.remove (remote_endpoint);
|
||||
// Clear temporary channel
|
||||
auto exisiting_response_channel (node->network.tcp_channels.find_channel (remote_endpoint));
|
||||
if (exisiting_response_channel != nullptr)
|
||||
|
@ -521,15 +520,10 @@ public:
|
|||
virtual ~request_response_visitor () = default;
|
||||
void keepalive (nano::keepalive const & message_a) override
|
||||
{
|
||||
bool first_keepalive (connection->keepalive_first);
|
||||
if (first_keepalive)
|
||||
{
|
||||
connection->keepalive_first = false;
|
||||
}
|
||||
connection->finish_request_async ();
|
||||
auto connection_l (connection->shared_from_this ());
|
||||
connection->node->background ([connection_l, message_a, first_keepalive]() {
|
||||
connection_l->node->network.tcp_channels.process_keepalive (message_a, connection_l->remote_endpoint, first_keepalive);
|
||||
connection->node->background ([connection_l, message_a]() {
|
||||
connection_l->node->network.tcp_channels.process_keepalive (message_a, connection_l->remote_endpoint);
|
||||
});
|
||||
}
|
||||
void publish (nano::publish const & message_a) override
|
||||
|
|
|
@ -70,7 +70,6 @@ public:
|
|||
std::queue<std::unique_ptr<nano::message>> requests;
|
||||
std::atomic<bool> stopped{ false };
|
||||
std::atomic<nano::bootstrap_server_type> type{ nano::bootstrap_server_type::undefined };
|
||||
std::atomic<bool> keepalive_first{ true };
|
||||
// Remote enpoint used to remove response channel even after socket closing
|
||||
nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 };
|
||||
nano::account remote_node_id{ 0 };
|
||||
|
|
|
@ -718,42 +718,6 @@ std::shared_ptr<nano::transport::channel> nano::network::find_node_id (nano::acc
|
|||
return result;
|
||||
}
|
||||
|
||||
std::shared_ptr<nano::transport::channel> nano::network::find_response_channel (nano::tcp_endpoint const & endpoint_a, nano::account const & node_id_a)
|
||||
{
|
||||
// Search by node ID
|
||||
std::shared_ptr<nano::transport::channel> result (find_node_id (node_id_a));
|
||||
if (!result)
|
||||
{
|
||||
// Search in response channels
|
||||
auto channels_list (response_channels.search (endpoint_a));
|
||||
// TCP
|
||||
for (auto & i : channels_list)
|
||||
{
|
||||
auto search_channel (tcp_channels.find_channel (i));
|
||||
if (search_channel != nullptr)
|
||||
{
|
||||
result = search_channel;
|
||||
break;
|
||||
}
|
||||
}
|
||||
// UDP
|
||||
if (!result)
|
||||
{
|
||||
for (auto & i : channels_list)
|
||||
{
|
||||
auto udp_endpoint (nano::transport::map_tcp_to_endpoint (i));
|
||||
auto search_channel (udp_channels.channel (udp_endpoint));
|
||||
if (search_channel != nullptr)
|
||||
{
|
||||
result = search_channel;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
nano::endpoint nano::network::endpoint ()
|
||||
{
|
||||
return udp_channels.get_local_endpoint ();
|
||||
|
@ -910,48 +874,6 @@ void nano::message_buffer_manager::stop ()
|
|||
condition.notify_all ();
|
||||
}
|
||||
|
||||
void nano::response_channels::add (nano::tcp_endpoint const & endpoint_a, std::vector<nano::tcp_endpoint> insert_channels)
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (response_channels_mutex);
|
||||
channels.emplace (endpoint_a, insert_channels);
|
||||
}
|
||||
|
||||
std::vector<nano::tcp_endpoint> nano::response_channels::search (nano::tcp_endpoint const & endpoint_a)
|
||||
{
|
||||
std::vector<nano::tcp_endpoint> result;
|
||||
nano::lock_guard<std::mutex> lock (response_channels_mutex);
|
||||
auto existing (channels.find (endpoint_a));
|
||||
if (existing != channels.end ())
|
||||
{
|
||||
result = existing->second;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void nano::response_channels::remove (nano::tcp_endpoint const & endpoint_a)
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (response_channels_mutex);
|
||||
channels.erase (endpoint_a);
|
||||
}
|
||||
|
||||
size_t nano::response_channels::size ()
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (response_channels_mutex);
|
||||
return channels.size ();
|
||||
}
|
||||
|
||||
std::unique_ptr<nano::seq_con_info_component> nano::response_channels::collect_seq_con_info (std::string const & name)
|
||||
{
|
||||
size_t channels_count = 0;
|
||||
{
|
||||
nano::lock_guard<std::mutex> response_channels_guard (response_channels_mutex);
|
||||
channels_count = channels.size ();
|
||||
}
|
||||
auto composite = std::make_unique<seq_con_info_composite> (name);
|
||||
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "channels", channels_count, sizeof (decltype (channels)::value_type) }));
|
||||
return composite;
|
||||
}
|
||||
|
||||
boost::optional<nano::uint256_union> nano::syn_cookies::assign (nano::endpoint const & endpoint_a)
|
||||
{
|
||||
auto ip_addr (endpoint_a.address ());
|
||||
|
|
|
@ -65,22 +65,6 @@ private:
|
|||
std::vector<nano::message_buffer> entries;
|
||||
bool stopped;
|
||||
};
|
||||
/**
|
||||
* Response channels for TCP realtime network
|
||||
*/
|
||||
class response_channels final
|
||||
{
|
||||
public:
|
||||
void add (nano::tcp_endpoint const &, std::vector<nano::tcp_endpoint>);
|
||||
std::vector<nano::tcp_endpoint> search (nano::tcp_endpoint const &);
|
||||
void remove (nano::tcp_endpoint const &);
|
||||
size_t size ();
|
||||
std::unique_ptr<seq_con_info_component> collect_seq_con_info (std::string const &);
|
||||
|
||||
private:
|
||||
std::mutex response_channels_mutex;
|
||||
std::unordered_map<nano::tcp_endpoint, std::vector<nano::tcp_endpoint>> channels;
|
||||
};
|
||||
/**
|
||||
* Node ID cookies for node ID handshakes
|
||||
*/
|
||||
|
@ -158,9 +142,6 @@ public:
|
|||
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (size_t) const;
|
||||
// Get the next peer for attempting a tcp bootstrap connection
|
||||
nano::tcp_endpoint bootstrap_peer ();
|
||||
// Response channels
|
||||
nano::response_channels response_channels;
|
||||
std::shared_ptr<nano::transport::channel> find_response_channel (nano::tcp_endpoint const &, nano::account const &);
|
||||
nano::endpoint endpoint ();
|
||||
void cleanup (std::chrono::steady_clock::time_point const &);
|
||||
void ongoing_cleanup ();
|
||||
|
|
|
@ -600,7 +600,6 @@ std::unique_ptr<seq_con_info_component> collect_seq_con_info (node & node, const
|
|||
composite->add_component (collect_seq_con_info (node.bootstrap, "bootstrap"));
|
||||
composite->add_component (node.network.tcp_channels.collect_seq_con_info ("tcp_channels"));
|
||||
composite->add_component (node.network.udp_channels.collect_seq_con_info ("udp_channels"));
|
||||
composite->add_component (node.network.response_channels.collect_seq_con_info ("response_channels"));
|
||||
composite->add_component (node.network.syn_cookies.collect_seq_con_info ("syn_cookies"));
|
||||
composite->add_component (collect_seq_con_info (node.observers, "observers"));
|
||||
composite->add_component (collect_seq_con_info (node.wallets, "wallets"));
|
||||
|
|
|
@ -251,7 +251,7 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa
|
|||
}
|
||||
else
|
||||
{
|
||||
channel = node.network.find_response_channel (endpoint_a, node_id_a);
|
||||
channel = node.network.find_node_id (node_id_a);
|
||||
if (channel)
|
||||
{
|
||||
node.network.process_message (message_a, channel);
|
||||
|
@ -286,31 +286,16 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa
|
|||
}
|
||||
}
|
||||
|
||||
void nano::transport::tcp_channels::process_keepalive (nano::keepalive const & message_a, nano::tcp_endpoint const & endpoint_a, bool keepalive_first)
|
||||
void nano::transport::tcp_channels::process_keepalive (nano::keepalive const & message_a, nano::tcp_endpoint const & endpoint_a)
|
||||
{
|
||||
if (!max_ip_connections (endpoint_a))
|
||||
{
|
||||
// Check for special node port data
|
||||
std::vector<nano::tcp_endpoint> insert_response_channels;
|
||||
auto peer0 (message_a.peers[0]);
|
||||
auto peer1 (message_a.peers[1]);
|
||||
if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0)
|
||||
{
|
||||
nano::endpoint new_endpoint (endpoint_a.address (), peer0.port ());
|
||||
node.network.merge_peer (new_endpoint);
|
||||
if (keepalive_first)
|
||||
{
|
||||
insert_response_channels.push_back (nano::transport::map_endpoint_to_tcp (new_endpoint));
|
||||
}
|
||||
}
|
||||
if (peer1.address () != boost::asio::ip::address_v6{} && peer1.port () != 0 && keepalive_first)
|
||||
{
|
||||
insert_response_channels.push_back (nano::transport::map_endpoint_to_tcp (peer1));
|
||||
}
|
||||
// Insert preferred response channels from first TCP keepalive
|
||||
if (!insert_response_channels.empty ())
|
||||
{
|
||||
node.network.response_channels.add (endpoint_a, insert_response_channels);
|
||||
}
|
||||
auto udp_channel (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, nano::transport::map_tcp_to_endpoint (endpoint_a), node.network_params.protocol.protocol_version));
|
||||
node.network.process_message (message_a, udp_channel);
|
||||
|
@ -577,7 +562,6 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
|
|||
}
|
||||
// Listen for possible responses
|
||||
channel_a->response_server = std::make_shared<nano::bootstrap_server> (channel_a->socket, node_l);
|
||||
channel_a->response_server->keepalive_first = false;
|
||||
channel_a->response_server->type = nano::bootstrap_server_type::realtime_response_server;
|
||||
channel_a->response_server->remote_node_id = channel_a->get_node_id ();
|
||||
channel_a->response_server->receive ();
|
||||
|
|
|
@ -90,7 +90,7 @@ namespace transport
|
|||
void start ();
|
||||
void stop ();
|
||||
void process_message (nano::message const &, nano::tcp_endpoint const &, nano::account const &, std::shared_ptr<nano::socket>, nano::bootstrap_server_type);
|
||||
void process_keepalive (nano::keepalive const &, nano::tcp_endpoint const &, bool);
|
||||
void process_keepalive (nano::keepalive const &, nano::tcp_endpoint const &);
|
||||
bool max_ip_connections (nano::tcp_endpoint const &);
|
||||
// Should we reach out to this endpoint with a keepalive message
|
||||
bool reachout (nano::endpoint const &);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue