Bidirectional TCP sockets (#2087)

* Add temporary TCP channels for bootstrap server

* Add response server to tcp channel

* Use node_flags for delay_frontier_confirmation_height_updating

* Test node.bidirectional_tcp

* Set multi_writer for temporary channel

* Clear bootstrap_server type

bootstrap, realtime or undefined

* Better closing of response server & temporary channels

* Build errors fix

* Better asserts

* Apply review changes

* Use range erase by node ID for multiindex

* channel_tcp in bootstrap_server

* Remove mutex from bootstrap_server::stop () to prevent deadlocks

* Revert "channel_tcp in bootstrap_server"
This commit is contained in:
Sergey Kroshnin 2019-06-27 19:46:57 +03:00 committed by Zach Hyatt
commit bba4fd73c8
12 changed files with 209 additions and 54 deletions

View file

@ -2335,7 +2335,7 @@ TEST (network, replace_port)
}
}
auto peers_list (system.nodes[0]->network.list (std::numeric_limits<size_t>::max ()));
ASSERT_EQ (peers_list[0]->get_node_id ().get (), node1->node_id.pub);
ASSERT_EQ (peers_list[0]->get_node_id (), node1->node_id.pub);
auto channel (std::make_shared<nano::transport::channel_udp> (system.nodes[0]->network.udp_channels, node1->network.endpoint ()));
system.nodes[0]->network.send_keepalive (channel);
system.deadline_set (5s);

View file

@ -2691,7 +2691,12 @@ TEST (node, peers)
{
ASSERT_NO_ERROR (system.poll ());
}
// Wait to finish TCP node ID handshakes
system.deadline_set (10s);
while (system.nodes.back ()->network.response_channels.size () == 0 || system.nodes.front ()->network.response_channels.size () == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
// Confirm that the peers match with the endpoints we are expecting
ASSERT_EQ (1, system.nodes.front ()->network.size ());
auto list1 (system.nodes[0]->network.list (2));
@ -2838,6 +2843,75 @@ TEST (node, dont_write_lock_node)
finished_promise.set_value ();
}
TEST (node, bidirectional_tcp)
{
nano::system system;
nano::node_flags node_flags;
node_flags.delay_frontier_confirmation_height_updating = true;
node_flags.disable_udp = true; // Disable UDP connections
auto node1 = system.add_node (nano::node_config (24000, system.logging), node_flags);
nano::node_config node_config (24001, system.logging);
node_config.tcp_incoming_connections_max = 0; // Disable incoming TCP connections for node 2
auto node2 = system.add_node (node_config, node_flags);
// Check network connections
ASSERT_EQ (1, node1->network.size ());
ASSERT_EQ (1, node2->network.size ());
auto list1 (node1->network.list (1));
ASSERT_EQ (nano::transport::transport_type::tcp, list1[0]->get_type ());
ASSERT_NE (node2->network.endpoint (), list1[0]->get_endpoint ()); // Ephemeral port
ASSERT_EQ (node2->node_id.pub, list1[0]->get_node_id ());
auto list2 (node2->network.list (1));
ASSERT_EQ (nano::transport::transport_type::tcp, list2[0]->get_type ());
ASSERT_EQ (node1->network.endpoint (), list2[0]->get_endpoint ());
ASSERT_EQ (node1->node_id.pub, list2[0]->get_node_id ());
// Test block propagation from node 1
nano::genesis genesis;
nano::keypair key;
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, node1->work_generate_blocking (genesis.hash ())));
node1->process_active (send1);
node1->block_processor.flush ();
system.deadline_set (5s);
while (!node1->ledger.block_exists (send1->hash ()) || !node2->ledger.block_exists (send1->hash ()))
{
ASSERT_NO_ERROR (system.poll ());
}
// Test block confirmation from node 1
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
bool confirmed (false);
system.deadline_set (10s);
while (!confirmed)
{
auto transaction1 (node1->store.tx_begin_read ());
auto transaction2 (node2->store.tx_begin_read ());
confirmed = node1->ledger.block_confirmed (transaction1, send1->hash ()) && node2->ledger.block_confirmed (transaction2, send1->hash ());
ASSERT_NO_ERROR (system.poll ());
}
// Test block propagation from node 2
{
auto transaction (system.wallet (0)->wallets.tx_begin_write ());
system.wallet (0)->store.erase (transaction, nano::test_genesis_key.pub);
}
auto send2 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 2 * nano::Gxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, node1->work_generate_blocking (send1->hash ())));
node2->process_active (send2);
node2->block_processor.flush ();
system.deadline_set (5s);
while (!node1->ledger.block_exists (send2->hash ()) || !node2->ledger.block_exists (send2->hash ()))
{
ASSERT_NO_ERROR (system.poll ());
}
// Test block confirmation from node 2
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
confirmed = false;
system.deadline_set (10s);
while (!confirmed)
{
auto transaction1 (node1->store.tx_begin_read ());
auto transaction2 (node2->store.tx_begin_read ());
confirmed = node1->ledger.block_confirmed (transaction1, send2->hash ()) && node2->ledger.block_confirmed (transaction2, send2->hash ());
ASSERT_NO_ERROR (system.poll ());
}
}
namespace nano
{
TEST (confirmation_height, prioritize_frontiers)

View file

@ -1911,14 +1911,21 @@ nano::bootstrap_server::~bootstrap_server ()
{
node->logger.try_log ("Exiting incoming TCP/bootstrap server");
}
if (bootstrap_connection)
if (type == nano::bootstrap_server_type::bootstrap)
{
--node->bootstrap.bootstrap_count;
}
if (node_id_handshake_finished)
else if (type == nano::bootstrap_server_type::realtime)
{
--node->bootstrap.realtime_count;
node->network.response_channels.remove (remote_endpoint);
// Clear temporary channel
auto exisiting_response_channel (node->network.tcp_channels.find_channel (remote_endpoint));
if (exisiting_response_channel != nullptr)
{
exisiting_response_channel->server = false;
node->network.tcp_channels.erase (remote_endpoint);
}
}
stop ();
std::lock_guard<std::mutex> lock (node->bootstrap.mutex);
@ -1929,7 +1936,6 @@ void nano::bootstrap_server::stop ()
{
if (!stopped.exchange (true))
{
std::lock_guard<std::mutex> lock (mutex);
if (socket != nullptr)
{
socket->close ();
@ -2154,7 +2160,7 @@ void nano::bootstrap_server::receive_keepalive_action (boost::system::error_code
std::unique_ptr<nano::keepalive> request (new nano::keepalive (error, stream, header_a));
if (!error)
{
if (node_id_handshake_finished)
if (type == nano::bootstrap_server_type::realtime || type == nano::bootstrap_server_type::realtime_response_server)
{
add_request (std::unique_ptr<nano::message> (request.release ()));
}
@ -2179,7 +2185,7 @@ void nano::bootstrap_server::receive_publish_action (boost::system::error_code c
std::unique_ptr<nano::publish> request (new nano::publish (error, stream, header_a));
if (!error)
{
if (node_id_handshake_finished)
if (type == nano::bootstrap_server_type::realtime || type == nano::bootstrap_server_type::realtime_response_server)
{
add_request (std::unique_ptr<nano::message> (request.release ()));
}
@ -2204,7 +2210,7 @@ void nano::bootstrap_server::receive_confirm_req_action (boost::system::error_co
std::unique_ptr<nano::confirm_req> request (new nano::confirm_req (error, stream, header_a));
if (!error)
{
if (node_id_handshake_finished)
if (type == nano::bootstrap_server_type::realtime || type == nano::bootstrap_server_type::realtime_response_server)
{
add_request (std::unique_ptr<nano::message> (request.release ()));
}
@ -2226,7 +2232,7 @@ void nano::bootstrap_server::receive_confirm_ack_action (boost::system::error_co
std::unique_ptr<nano::confirm_ack> request (new nano::confirm_ack (error, stream, header_a));
if (!error)
{
if (node_id_handshake_finished)
if (type == nano::bootstrap_server_type::realtime || type == nano::bootstrap_server_type::realtime_response_server)
{
add_request (std::unique_ptr<nano::message> (request.release ()));
}
@ -2248,7 +2254,7 @@ void nano::bootstrap_server::receive_node_id_handshake_action (boost::system::er
std::unique_ptr<nano::node_id_handshake> request (new nano::node_id_handshake (error, stream, header_a));
if (!error)
{
if (!node_id_handshake_finished && !node->flags.disable_tcp_realtime)
if (type == nano::bootstrap_server_type::undefined && !node->flags.disable_tcp_realtime)
{
add_request (std::unique_ptr<nano::message> (request.release ()));
}
@ -2356,7 +2362,7 @@ public:
connection->finish_request_async ();
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a]() {
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id);
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
});
}
void confirm_req (nano::confirm_req const & message_a) override
@ -2364,7 +2370,7 @@ public:
connection->finish_request_async ();
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a]() {
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id);
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
});
}
void confirm_ack (nano::confirm_ack const & message_a) override
@ -2372,7 +2378,7 @@ public:
connection->finish_request_async ();
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a]() {
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id);
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
});
}
void bulk_pull (nano::bulk_pull const &) override
@ -2429,10 +2435,11 @@ public:
}
else if (message_a.response)
{
connection->remote_node_id = message_a.response->first;
if (!connection->node->network.syn_cookies.validate (nano::transport::map_tcp_to_endpoint (connection->remote_endpoint), connection->remote_node_id, message_a.response->second) && connection->remote_node_id != connection->node->node_id.pub)
auto node_id (message_a.response->first);
connection->remote_node_id = node_id;
if (!connection->node->network.syn_cookies.validate (nano::transport::map_tcp_to_endpoint (connection->remote_endpoint), node_id, message_a.response->second) && node_id != connection->node->node_id.pub)
{
connection->node_id_handshake_finished = true;
connection->type = nano::bootstrap_server_type::realtime;
++connection->node->bootstrap.realtime_count;
connection->finish_request_async ();
}
@ -2446,9 +2453,10 @@ public:
{
connection->finish_request_async ();
}
assert (connection->remote_node_id.is_zero () || connection->type == nano::bootstrap_server_type::realtime);
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a]() {
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id);
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
});
}
std::shared_ptr<nano::bootstrap_server> connection;
@ -2464,12 +2472,12 @@ void nano::bootstrap_server::run_next ()
bool nano::bootstrap_server::is_bootstrap_connection ()
{
if (!bootstrap_connection && !node->flags.disable_bootstrap_listener && node->bootstrap.bootstrap_count < node->config.bootstrap_connections_max)
if (type == nano::bootstrap_server_type::undefined && !node->flags.disable_bootstrap_listener && node->bootstrap.bootstrap_count < node->config.bootstrap_connections_max)
{
++node->bootstrap.bootstrap_count;
bootstrap_connection = true;
type = nano::bootstrap_server_type::bootstrap;
}
return bootstrap_connection;
return type == nano::bootstrap_server_type::bootstrap;
}
/**

View file

@ -289,6 +289,13 @@ private:
std::unique_ptr<seq_con_info_component> collect_seq_con_info (bootstrap_listener & bootstrap_listener, const std::string & name);
class message;
enum class bootstrap_server_type
{
undefined,
bootstrap,
realtime,
realtime_response_server // special type for tcp channel response server
};
class bootstrap_server final : public std::enable_shared_from_this<nano::bootstrap_server>
{
public:
@ -317,9 +324,9 @@ public:
std::mutex mutex;
std::queue<std::unique_ptr<nano::message>> requests;
std::atomic<bool> stopped{ false };
std::atomic<bool> bootstrap_connection{ false };
std::atomic<bool> node_id_handshake_finished{ false };
std::atomic<nano::bootstrap_server_type> type{ nano::bootstrap_server_type::undefined };
std::atomic<bool> keepalive_first{ true };
// Remote enpoint used to remove response channel even after socket closing
nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 };
nano::account remote_node_id{ 0 };
};

View file

@ -2529,7 +2529,7 @@ void nano::json_handler::peers ()
{
boost::property_tree::ptree pending_tree;
pending_tree.put ("protocol_version", std::to_string (channel->get_network_version ()));
auto node_id_l (channel->get_node_id ());
auto node_id_l (channel->get_node_id_optional ());
if (node_id_l.is_initialized ())
{
pending_tree.put ("node_id", node_id_l.get ().to_account ());

View file

@ -63,7 +63,7 @@ std::shared_ptr<nano::node> nano::system::add_node (nano::node_config const & no
new1 = node1->network.size ();
new2 = node2->network.size ();
} while (new1 == starting1 || new2 == starting2);
if (type_a == nano::transport::transport_type::tcp && !node_flags_a.disable_tcp_realtime)
if (type_a == nano::transport::transport_type::tcp && node_config_a.tcp_incoming_connections_max != 0 && !node_flags_a.disable_tcp_realtime)
{
// Wait for initial connection finish
decltype (starting_listener1) new_listener1;

View file

@ -11,10 +11,17 @@ socket (socket_a)
nano::transport::channel_tcp::~channel_tcp ()
{
std::lock_guard<std::mutex> lk (channel_mutex);
if (socket)
// Close socket. Exception: socket is used by bootstrap_server
if (socket && !server)
{
socket->close ();
}
// Remove response server
if (response_server != nullptr)
{
response_server->stop ();
response_server = nullptr;
}
}
size_t nano::transport::channel_tcp::hash_code () const
@ -90,12 +97,19 @@ bool nano::transport::tcp_channels::insert (std::shared_ptr<nano::transport::cha
auto existing (channels.get<endpoint_tag> ().find (endpoint));
if (existing == channels.get<endpoint_tag> ().end ())
{
auto node_id (channel_a->get_node_id ());
if (!channel_a->server)
{
channels.get<node_id_tag> ().erase (node_id);
}
channels.get<endpoint_tag> ().insert ({ channel_a });
error = false;
lock.unlock ();
node.network.channel_observer (channel_a);
// Remove UDP channel to same IP:port if exists
node.network.udp_channels.erase (udp_endpoint);
// Remove UDP channels with same node ID
node.network.udp_channels.clean_node_id (node_id);
}
}
return error;
@ -226,7 +240,7 @@ nano::tcp_endpoint nano::transport::tcp_channels::bootstrap_peer ()
return result;
}
void nano::transport::tcp_channels::process_message (nano::message const & message_a, nano::tcp_endpoint const & endpoint_a, nano::account const & node_id_a)
void nano::transport::tcp_channels::process_message (nano::message const & message_a, nano::tcp_endpoint const & endpoint_a, nano::account const & node_id_a, std::shared_ptr<nano::socket> socket_a, nano::bootstrap_server_type type_a)
{
if (!stopped)
{
@ -242,11 +256,31 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa
{
node.network.process_message (message_a, channel);
}
else if (!node.flags.disable_udp || (message_a.header.type != nano::message_type::confirm_req && message_a.header.type != nano::message_type::confirm_ack))
else if (!node_id_a.is_zero ())
{
// confirm_req & confirm_ack are only message types that can produce response
auto udp_channel (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, nano::transport::map_tcp_to_endpoint (endpoint_a)));
node.network.process_message (message_a, udp_channel);
// Add temporary channel
socket_a->set_writer_concurrency (nano::socket::concurrency::multi_writer);
auto temporary_channel (std::make_shared<nano::transport::channel_tcp> (node, socket_a));
assert (endpoint_a == temporary_channel->get_tcp_endpoint ());
temporary_channel->set_node_id (node_id_a);
temporary_channel->set_network_version (message_a.header.version_using);
temporary_channel->set_last_packet_received (std::chrono::steady_clock::now ());
temporary_channel->set_last_packet_sent (std::chrono::steady_clock::now ());
temporary_channel->server = true;
// Don't insert temporary channels for response_server
assert (type_a == nano::bootstrap_server_type::realtime);
if (type_a == nano::bootstrap_server_type::realtime)
{
insert (temporary_channel);
}
node.network.process_message (message_a, temporary_channel);
}
else
{
// Initial node_id_handshake request without node ID
assert (message_a.header.type == nano::message_type::node_id_handshake);
assert (type_a == nano::bootstrap_server_type::undefined);
node.stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in);
}
}
}
@ -291,6 +325,7 @@ void nano::transport::tcp_channels::start ()
void nano::transport::tcp_channels::stop ()
{
stopped = true;
std::unique_lock<std::mutex> lock (mutex);
// Close all TCP sockets
for (auto i (channels.begin ()), j (channels.end ()); i != j; ++i)
{
@ -298,6 +333,12 @@ void nano::transport::tcp_channels::stop ()
{
i->channel->socket->close ();
}
// Remove response server
if (i->channel->response_server != nullptr)
{
i->channel->response_server->stop ();
i->channel->response_server = nullptr;
}
}
}
@ -496,7 +537,18 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
{
channel_a->set_network_version (header.version_using);
auto node_id (message.response->first);
if (!node_l->network.syn_cookies.validate (endpoint_a, node_id, message.response->second) && node_id != node_l->node_id.pub && !node_l->network.tcp_channels.find_node_id (node_id))
bool process (!node_l->network.syn_cookies.validate (endpoint_a, node_id, message.response->second) && node_id != node_l->node_id.pub);
if (process)
{
/* If node ID is known, don't establish new connection
Exception: temporary channels from bootstrap_server */
auto existing_channel (node_l->network.tcp_channels.find_node_id (node_id));
if (existing_channel)
{
process = existing_channel->server;
}
}
if (process)
{
channel_a->set_node_id (node_id);
channel_a->set_last_packet_received (std::chrono::steady_clock::now ());
@ -519,6 +571,12 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
{
callback_a (channel_a);
}
// Listen for possible responses
channel_a->response_server = std::make_shared<nano::bootstrap_server> (channel_a->socket, node_l);
channel_a->response_server->keepalive_first = false;
channel_a->response_server->type = nano::bootstrap_server_type::realtime_response_server;
channel_a->response_server->remote_node_id = channel_a->get_node_id ();
channel_a->response_server->receive ();
}
else
{
@ -531,7 +589,6 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
}
});
}
// If node ID is known, don't establish new connection
}
else
{

View file

@ -13,6 +13,8 @@
namespace nano
{
class bootstrap_server;
enum class bootstrap_server_type;
namespace transport
{
class tcp_channels;
@ -34,6 +36,8 @@ namespace transport
return &node == &other_a.node && socket == other_a.socket;
}
std::shared_ptr<nano::socket> socket;
std::shared_ptr<nano::bootstrap_server> response_server;
bool server{ false };
nano::endpoint get_endpoint () const override
{
@ -85,7 +89,7 @@ namespace transport
void receive ();
void start ();
void stop ();
void process_message (nano::message const &, nano::tcp_endpoint const &, nano::account const &);
void process_message (nano::message const &, nano::tcp_endpoint const &, nano::account const &, std::shared_ptr<nano::socket>, nano::bootstrap_server_type);
void process_keepalive (nano::keepalive const &, nano::tcp_endpoint const &, bool);
bool max_ip_connections (nano::tcp_endpoint const &);
// Should we reach out to this endpoint with a keepalive message
@ -143,16 +147,9 @@ namespace transport
}
nano::account node_id () const
{
auto node_id_l (channel->get_node_id ());
if (node_id_l.is_initialized ())
{
return node_id_l.get ();
}
else
{
assert (false);
return 0;
}
auto node_id (channel->get_node_id ());
assert (!node_id.is_zero ());
return node_id;
}
};
class tcp_endpoint_attempt final

View file

@ -97,12 +97,25 @@ namespace transport
last_packet_sent = time_a;
}
boost::optional<nano::account> get_node_id () const
boost::optional<nano::account> get_node_id_optional () const
{
std::lock_guard<std::mutex> lk (channel_mutex);
return node_id;
}
nano::account get_node_id () const
{
std::lock_guard<std::mutex> lk (channel_mutex);
if (node_id.is_initialized ())
{
return node_id.get ();
}
else
{
return 0;
}
}
void set_node_id (nano::account node_id_a)
{
std::lock_guard<std::mutex> lk (channel_mutex);

View file

@ -215,6 +215,12 @@ std::shared_ptr<nano::transport::channel_udp> nano::transport::udp_channels::fin
return result;
}
void nano::transport::udp_channels::clean_node_id (nano::account const & node_id_a)
{
std::lock_guard<std::mutex> lock (mutex);
channels.get<node_id_tag> ().erase (node_id_a);
}
void nano::transport::udp_channels::clean_node_id (nano::endpoint const & endpoint_a, nano::account const & node_id_a)
{
std::lock_guard<std::mutex> lock (mutex);

View file

@ -70,6 +70,7 @@ namespace transport
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (size_t) const;
bool store_all (bool = true);
std::shared_ptr<nano::transport::channel_udp> find_node_id (nano::account const &);
void clean_node_id (nano::account const &);
void clean_node_id (nano::endpoint const &, nano::account const &);
// Get the next peer for attempting a tcp bootstrap connection
nano::tcp_endpoint bootstrap_peer (uint8_t connection_protocol_version_min = nano::protocol_version_reasonable_min);
@ -133,15 +134,7 @@ namespace transport
}
nano::account node_id () const
{
auto node_id_l (channel->get_node_id ());
if (node_id_l.is_initialized ())
{
return node_id_l.get ();
}
else
{
return 0;
}
return channel->get_node_id ();
}
};
class endpoint_attempt final

View file

@ -1949,7 +1949,7 @@ void nano_qt::advanced_actions::refresh_peers ()
version->setData (QVariant (channel->get_network_version ()), Qt::DisplayRole);
items.push_back (version);
QString node_id ("");
auto node_id_l (channel->get_node_id ());
auto node_id_l (channel->get_node_id_optional ());
if (node_id_l.is_initialized ())
{
node_id = node_id_l.get ().to_account ().c_str ();