Don't bind to UDP socket with --disable_udp flag (#2464)

* don't bind to UDP socket with --disable_udp flag
* don't start port mapping with both UDP & TCP sockets disabled
* use predefined port in `nano::network::endpoint ()` function rather than call to UDP socket
* update TCP (bootstrap) server `endpoint ()` function to return empty endpoint if server is not started
* update several tests to run twice: with UDP only and with TCP only
* attepmt to start realtime TCP connection to RPC bootstrap peer if UDP is disabled
* make UDP socket unique_ptr
This commit is contained in:
Sergey Kroshnin 2020-01-20 23:33:32 +03:00 committed by GitHub
commit e9aae5cb6e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 273 additions and 128 deletions

View file

@ -275,7 +275,17 @@ TEST (network, send_valid_confirm_ack)
std::vector<nano::transport::transport_type> types{ nano::transport::transport_type::tcp, nano::transport::transport_type::udp };
for (auto & type : types)
{
nano::system system (2, type);
nano::node_flags node_flags;
if (type == nano::transport::transport_type::tcp)
{
node_flags.disable_udp = true;
}
else
{
node_flags.disable_tcp_realtime = true;
node_flags.disable_bootstrap_listener = true;
}
nano::system system (2, type, node_flags);
auto & node1 (*system.nodes[0]);
auto & node2 (*system.nodes[1]);
nano::keypair key2;
@ -301,7 +311,17 @@ TEST (network, send_valid_publish)
std::vector<nano::transport::transport_type> types{ nano::transport::transport_type::tcp, nano::transport::transport_type::udp };
for (auto & type : types)
{
nano::system system (2, type);
nano::node_flags node_flags;
if (type == nano::transport::transport_type::tcp)
{
node_flags.disable_udp = true;
}
else
{
node_flags.disable_tcp_realtime = true;
node_flags.disable_bootstrap_listener = true;
}
nano::system system (2, type, node_flags);
auto & node1 (*system.nodes[0]);
auto & node2 (*system.nodes[1]);
node1.bootstrap_initiator.stop ();
@ -381,7 +401,17 @@ TEST (receivable_processor, send_with_receive)
std::vector<nano::transport::transport_type> types{ nano::transport::transport_type::tcp, nano::transport::transport_type::udp };
for (auto & type : types)
{
nano::system system (2, type);
nano::node_flags node_flags;
if (type == nano::transport::transport_type::tcp)
{
node_flags.disable_udp = true;
}
else
{
node_flags.disable_tcp_realtime = true;
node_flags.disable_bootstrap_listener = true;
}
nano::system system (2, type, node_flags);
auto & node1 (*system.nodes[0]);
auto & node2 (*system.nodes[1]);
auto amount (std::numeric_limits<nano::uint128_t>::max ());

View file

@ -1109,6 +1109,26 @@ TEST (node_flags, disable_tcp_realtime)
ASSERT_EQ (nano::transport::transport_type::udp, list2[0]->get_type ());
}
TEST (node_flags, disable_tcp_realtime_and_bootstrap_listener)
{
nano::system system (1);
auto node1 = system.nodes[0];
nano::node_flags node_flags;
node_flags.disable_tcp_realtime = true;
node_flags.disable_bootstrap_listener = true;
auto node2 = system.add_node (nano::node_config (nano::get_available_port (), system.logging), node_flags);
ASSERT_EQ (nano::tcp_endpoint (boost::asio::ip::address_v6::loopback (), 0), node2->bootstrap.endpoint ());
ASSERT_NE (nano::endpoint (boost::asio::ip::address_v6::loopback (), 0), node2->network.endpoint ());
ASSERT_EQ (1, node1->network.size ());
auto list1 (node1->network.list (2));
ASSERT_EQ (node2->network.endpoint (), list1[0]->get_endpoint ());
ASSERT_EQ (nano::transport::transport_type::udp, list1[0]->get_type ());
ASSERT_EQ (1, node2->network.size ());
auto list2 (node2->network.list (2));
ASSERT_EQ (node1->network.endpoint (), list2[0]->get_endpoint ());
ASSERT_EQ (nano::transport::transport_type::udp, list2[0]->get_type ());
}
TEST (node_flags, disable_udp)
{
nano::system system (1);
@ -1118,6 +1138,8 @@ TEST (node_flags, disable_udp)
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, nano::node_config (nano::get_available_port (), system.logging), system.work, node_flags));
system.nodes.push_back (node2);
node2->start ();
ASSERT_EQ (nano::endpoint (boost::asio::ip::address_v6::loopback (), 0), node2->network.udp_channels.get_local_endpoint ());
ASSERT_NE (nano::endpoint (boost::asio::ip::address_v6::loopback (), 0), node2->network.endpoint ());
// Send UDP message
auto channel (std::make_shared<nano::transport::channel_udp> (node1->network.udp_channels, node2->network.endpoint (), node2->network_params.protocol.protocol_version));
node1->network.send_keepalive (channel);
@ -1302,65 +1324,79 @@ TEST (node, fork_flip)
TEST (node, fork_multi_flip)
{
nano::system system (2);
auto & node1 (*system.nodes[0]);
auto & node2 (*system.nodes[1]);
ASSERT_EQ (1, node1.network.size ());
nano::keypair key1;
nano::genesis genesis;
auto send1 (std::make_shared<nano::send_block> (genesis.hash (), key1.pub, nano::genesis_amount - 100, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ())));
nano::publish publish1 (send1);
nano::keypair key2;
auto send2 (std::make_shared<nano::send_block> (genesis.hash (), key2.pub, nano::genesis_amount - 100, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ())));
nano::publish publish2 (send2);
auto send3 (std::make_shared<nano::send_block> (publish2.block->hash (), key2.pub, nano::genesis_amount - 100, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (publish2.block->hash ())));
nano::publish publish3 (send3);
node1.network.process_message (publish1, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.block_processor.flush ();
node2.network.process_message (publish2, node2.network.udp_channels.create (node2.network.endpoint ()));
node2.network.process_message (publish3, node2.network.udp_channels.create (node2.network.endpoint ()));
node2.block_processor.flush ();
ASSERT_EQ (1, node1.active.size ());
ASSERT_EQ (2, node2.active.size ());
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
node1.network.process_message (publish2, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.process_message (publish3, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.block_processor.flush ();
node2.network.process_message (publish1, node2.network.udp_channels.create (node2.network.endpoint ()));
node2.block_processor.flush ();
nano::unique_lock<std::mutex> lock (node2.active.mutex);
auto conflict (node2.active.roots.find (nano::qualified_root (genesis.hash (), genesis.hash ())));
ASSERT_NE (node2.active.roots.end (), conflict);
auto votes1 (conflict->election);
ASSERT_NE (nullptr, votes1);
ASSERT_EQ (1, votes1->last_votes.size ());
lock.unlock ();
std::vector<nano::transport::transport_type> types{ nano::transport::transport_type::tcp, nano::transport::transport_type::udp };
for (auto & type : types)
{
auto transaction (node1.store.tx_begin_read ());
ASSERT_TRUE (node1.store.block_exists (transaction, publish1.block->hash ()));
nano::node_flags node_flags;
if (type == nano::transport::transport_type::tcp)
{
node_flags.disable_udp = true;
}
else
{
node_flags.disable_tcp_realtime = true;
node_flags.disable_bootstrap_listener = true;
}
nano::system system (2, type, node_flags);
auto & node1 (*system.nodes[0]);
auto & node2 (*system.nodes[1]);
ASSERT_EQ (1, node1.network.size ());
nano::keypair key1;
nano::genesis genesis;
auto send1 (std::make_shared<nano::send_block> (genesis.hash (), key1.pub, nano::genesis_amount - 100, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ())));
nano::publish publish1 (send1);
nano::keypair key2;
auto send2 (std::make_shared<nano::send_block> (genesis.hash (), key2.pub, nano::genesis_amount - 100, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ())));
nano::publish publish2 (send2);
auto send3 (std::make_shared<nano::send_block> (publish2.block->hash (), key2.pub, nano::genesis_amount - 100, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (publish2.block->hash ())));
nano::publish publish3 (send3);
node1.network.process_message (publish1, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.block_processor.flush ();
node2.network.process_message (publish2, node2.network.udp_channels.create (node2.network.endpoint ()));
node2.network.process_message (publish3, node2.network.udp_channels.create (node2.network.endpoint ()));
node2.block_processor.flush ();
ASSERT_EQ (1, node1.active.size ());
ASSERT_EQ (2, node2.active.size ());
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
node1.network.process_message (publish2, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.process_message (publish3, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.block_processor.flush ();
node2.network.process_message (publish1, node2.network.udp_channels.create (node2.network.endpoint ()));
node2.block_processor.flush ();
nano::unique_lock<std::mutex> lock (node2.active.mutex);
auto conflict (node2.active.roots.find (nano::qualified_root (genesis.hash (), genesis.hash ())));
ASSERT_NE (node2.active.roots.end (), conflict);
auto votes1 (conflict->election);
ASSERT_NE (nullptr, votes1);
ASSERT_EQ (1, votes1->last_votes.size ());
lock.unlock ();
{
auto transaction (node1.store.tx_begin_read ());
ASSERT_TRUE (node1.store.block_exists (transaction, publish1.block->hash ()));
}
{
auto transaction (node2.store.tx_begin_read ());
ASSERT_TRUE (node2.store.block_exists (transaction, publish2.block->hash ()));
ASSERT_TRUE (node2.store.block_exists (transaction, publish3.block->hash ()));
}
system.deadline_set (10s);
auto done (false);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
done = node2.ledger.block_exists (publish1.block->hash ());
}
auto transaction1 (node1.store.tx_begin_read ());
auto transaction2 (node2.store.tx_begin_read ());
lock.lock ();
auto winner (*votes1->tally ().begin ());
ASSERT_EQ (*publish1.block, *winner.second);
ASSERT_EQ (nano::genesis_amount - 100, winner.first);
ASSERT_TRUE (node1.store.block_exists (transaction1, publish1.block->hash ()));
ASSERT_TRUE (node2.store.block_exists (transaction2, publish1.block->hash ()));
ASSERT_FALSE (node2.store.block_exists (transaction2, publish2.block->hash ()));
ASSERT_FALSE (node2.store.block_exists (transaction2, publish3.block->hash ()));
}
{
auto transaction (node2.store.tx_begin_read ());
ASSERT_TRUE (node2.store.block_exists (transaction, publish2.block->hash ()));
ASSERT_TRUE (node2.store.block_exists (transaction, publish3.block->hash ()));
}
system.deadline_set (10s);
auto done (false);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
done = node2.ledger.block_exists (publish1.block->hash ());
}
auto transaction1 (node1.store.tx_begin_read ());
auto transaction2 (node2.store.tx_begin_read ());
lock.lock ();
auto winner (*votes1->tally ().begin ());
ASSERT_EQ (*publish1.block, *winner.second);
ASSERT_EQ (nano::genesis_amount - 100, winner.first);
ASSERT_TRUE (node1.store.block_exists (transaction1, publish1.block->hash ()));
ASSERT_TRUE (node2.store.block_exists (transaction2, publish1.block->hash ()));
ASSERT_FALSE (node2.store.block_exists (transaction2, publish2.block->hash ()));
ASSERT_FALSE (node2.store.block_exists (transaction2, publish3.block->hash ()));
}
// Blocks that are no longer actively being voted on should be able to be evicted through bootstrapping.
@ -1657,7 +1693,17 @@ TEST (node, broadcast_elected)
std::vector<nano::transport::transport_type> types{ nano::transport::transport_type::tcp, nano::transport::transport_type::udp };
for (auto & type : types)
{
nano::system system (3, type);
nano::node_flags node_flags;
if (type == nano::transport::transport_type::tcp)
{
node_flags.disable_udp = true;
}
else
{
node_flags.disable_tcp_realtime = true;
node_flags.disable_bootstrap_listener = true;
}
nano::system system (3, type, node_flags);
auto node0 (system.nodes[0]);
auto node1 (system.nodes[1]);
auto node2 (system.nodes[2]);
@ -2340,7 +2386,17 @@ TEST (node, block_confirm)
std::vector<nano::transport::transport_type> types{ nano::transport::transport_type::tcp, nano::transport::transport_type::udp };
for (auto & type : types)
{
nano::system system (2, type);
nano::node_flags node_flags;
if (type == nano::transport::transport_type::tcp)
{
node_flags.disable_udp = true;
}
else
{
node_flags.disable_tcp_realtime = true;
node_flags.disable_bootstrap_listener = true;
}
nano::system system (2, type, node_flags);
auto & node1 (*system.nodes[0]);
auto & node2 (*system.nodes[1]);
nano::genesis genesis;
@ -2734,7 +2790,17 @@ TEST (node, vote_by_hash_republish)
std::vector<nano::transport::transport_type> types{ nano::transport::transport_type::tcp, nano::transport::transport_type::udp };
for (auto & type : types)
{
nano::system system (2, type);
nano::node_flags node_flags;
if (type == nano::transport::transport_type::tcp)
{
node_flags.disable_udp = true;
}
else
{
node_flags.disable_tcp_realtime = true;
node_flags.disable_bootstrap_listener = true;
}
nano::system system (2, type, node_flags);
auto & node1 (*system.nodes[0]);
auto & node2 (*system.nodes[1]);
nano::keypair key2;

View file

@ -1391,7 +1391,14 @@ void nano::bootstrap_initiator::bootstrap (nano::endpoint const & endpoint_a, bo
{
if (add_to_peers)
{
node.network.udp_channels.insert (nano::transport::map_endpoint_to_v6 (endpoint_a), node.network_params.protocol.protocol_version);
if (!node.flags.disable_udp)
{
node.network.udp_channels.insert (nano::transport::map_endpoint_to_v6 (endpoint_a), node.network_params.protocol.protocol_version);
}
else if (!node.flags.disable_tcp_realtime)
{
node.network.merge_peer (nano::transport::map_endpoint_to_v6 (endpoint_a));
}
}
nano::unique_lock<std::mutex> lock (mutex);
if (!stopped)

View file

@ -14,6 +14,8 @@ port (port_a)
void nano::bootstrap_listener::start ()
{
nano::lock_guard<std::mutex> lock (mutex);
on = true;
listening_socket = std::make_shared<nano::server_socket> (node.shared (), boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::any (), port), node.config.tcp_incoming_connections_max);
boost::system::error_code ec;
listening_socket->start (ec);
@ -22,6 +24,7 @@ void nano::bootstrap_listener::start ()
node.logger.try_log (boost::str (boost::format ("Error while binding for incoming TCP/bootstrap on port %1%: %2%") % listening_socket->listening_port () % ec.message ()));
throw std::runtime_error (ec.message ());
}
assert (node.network.endpoint ().port () == listening_socket->listening_port ());
listening_socket->on_connection ([this](std::shared_ptr<nano::socket> new_connection, boost::system::error_code const & ec_a) {
bool keep_accepting = true;
if (ec_a)
@ -47,6 +50,7 @@ void nano::bootstrap_listener::stop ()
}
if (listening_socket)
{
nano::lock_guard<std::mutex> lock (mutex);
listening_socket->close ();
listening_socket = nullptr;
}
@ -70,7 +74,15 @@ void nano::bootstrap_listener::accept_action (boost::system::error_code const &
boost::asio::ip::tcp::endpoint nano::bootstrap_listener::endpoint ()
{
return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), listening_socket->listening_port ());
nano::lock_guard<std::mutex> lock (mutex);
if (on && listening_socket)
{
return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), listening_socket->listening_port ());
}
else
{
return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), 0);
}
}
std::unique_ptr<nano::container_info_component> nano::collect_container_info (bootstrap_listener & bootstrap_listener, const std::string & name)

View file

@ -23,7 +23,7 @@ public:
nano::tcp_endpoint endpoint ();
nano::node & node;
std::shared_ptr<nano::server_socket> listening_socket;
bool on;
bool on{ false };
std::atomic<size_t> bootstrap_count{ 0 };
std::atomic<size_t> realtime_count{ 0 };

View file

@ -16,11 +16,12 @@ limiter (node_a.config.bandwidth_limit),
node (node_a),
udp_channels (node_a, port_a),
tcp_channels (node_a),
port (port_a),
disconnect_observer ([]() {})
{
boost::thread::attributes attrs;
nano::thread_attributes::set (attrs);
for (size_t i = 0; i < node.config.network_threads; ++i)
for (size_t i = 0; i < node.config.network_threads && !node.flags.disable_udp; ++i)
{
packet_processing_threads.emplace_back (attrs, [this]() {
nano::thread_role::set (nano::thread_role::name::packet_processing);
@ -68,6 +69,7 @@ void nano::network::start ()
if (!node.flags.disable_udp)
{
udp_channels.start ();
assert (udp_channels.get_local_endpoint ().port () == port);
}
if (!node.flags.disable_tcp_realtime)
{
@ -84,6 +86,7 @@ void nano::network::stop ()
tcp_channels.stop ();
resolver.cancel ();
buffer_container.stop ();
port = 0;
for (auto & thread : packet_processing_threads)
{
thread.join ();
@ -656,7 +659,7 @@ std::shared_ptr<nano::transport::channel> nano::network::find_node_id (nano::acc
nano::endpoint nano::network::endpoint ()
{
return udp_channels.get_local_endpoint ();
return nano::endpoint (boost::asio::ip::address_v6::loopback (), port);
}
void nano::network::cleanup (std::chrono::steady_clock::time_point const & cutoff_a)

View file

@ -159,6 +159,7 @@ public:
nano::node & node;
nano::transport::udp_channels udp_channels;
nano::transport::tcp_channels tcp_channels;
std::atomic<uint16_t> port{ 0 };
std::function<void()> disconnect_observer;
// Called when a new channel is observed
std::function<void(std::shared_ptr<nano::transport::channel>)> channel_observer;

View file

@ -646,9 +646,11 @@ void nano::node::start ()
ongoing_rep_calculation ();
ongoing_peer_store ();
ongoing_online_weight_calculation_queue ();
if (config.tcp_incoming_connections_max > 0)
bool tcp_enabled (false);
if (config.tcp_incoming_connections_max > 0 && !(flags.disable_bootstrap_listener && flags.disable_tcp_realtime))
{
bootstrap.start ();
tcp_enabled = true;
}
if (!flags.disable_backup)
{
@ -663,7 +665,8 @@ void nano::node::start ()
this_l->bootstrap_wallet ();
});
}
if (config.external_address == boost::asio::ip::address_v6{}.any ().to_string ())
// Start port mapping if external address is not defined and TCP or UDP ports are enabled
if (config.external_address == boost::asio::ip::address_v6{}.any ().to_string () && (tcp_enabled || !flags.disable_udp))
{
port_mapping.start ();
}
@ -1352,6 +1355,9 @@ nano::node_flags const & nano::inactive_node_flag_defaults ()
node_flags.generate_cache.reps = false;
node_flags.generate_cache.cemented_count = false;
node_flags.generate_cache.unchecked_count = false;
node_flags.disable_udp = true;
node_flags.disable_bootstrap_listener = true;
node_flags.disable_tcp_realtime = true;
return node_flags;
}

View file

@ -109,14 +109,14 @@ nano::system::system ()
logging.init (nano::unique_path ());
}
nano::system::system (uint16_t count_a, nano::transport::transport_type type_a) :
nano::system::system (uint16_t count_a, nano::transport::transport_type type_a, nano::node_flags flags_a) :
system ()
{
nodes.reserve (count_a);
for (uint16_t i (0); i < count_a; ++i)
{
nano::node_config config (nano::get_available_port (), logging);
add_node (config, nano::node_flags (), type_a);
add_node (config, flags_a, type_a);
}
}

View file

@ -17,7 +17,7 @@ class system final
{
public:
system ();
system (uint16_t, nano::transport::transport_type = nano::transport::transport_type::tcp);
system (uint16_t, nano::transport::transport_type = nano::transport::transport_type::tcp, nano::node_flags = nano::node_flags ());
~system ();
void generate_activity (nano::node &, std::vector<nano::account> &);
void generate_mass_activity (uint32_t, nano::node &);

View file

@ -310,6 +310,7 @@ void nano::transport::tcp_channels::process_keepalive (nano::keepalive const & m
nano::endpoint new_endpoint (endpoint_a.address (), peer0.port ());
node.network.merge_peer (new_endpoint);
}
// Used to store sender endpoint information only
auto udp_channel (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, nano::transport::map_tcp_to_endpoint (endpoint_a), node.network_params.protocol.protocol_version));
node.network.process_message (message_a, udp_channel);
}
@ -353,7 +354,7 @@ bool nano::transport::tcp_channels::reachout (nano::endpoint const & endpoint_a)
auto tcp_endpoint (nano::transport::map_endpoint_to_tcp (endpoint_a));
// Don't overload single IP
bool error = max_ip_connections (tcp_endpoint);
if (!error)
if (!error && !node.flags.disable_tcp_realtime)
{
// Don't keepalive to nodes that already sent us something
error |= find_channel (tcp_endpoint) != nullptr;

View file

@ -68,25 +68,35 @@ std::string nano::transport::channel_udp::to_string () const
nano::transport::udp_channels::udp_channels (nano::node & node_a, uint16_t port_a) :
node (node_a),
strand (node_a.io_ctx.get_executor ()),
socket (node_a.io_ctx, nano::endpoint (boost::asio::ip::address_v6::any (), port_a))
strand (node_a.io_ctx.get_executor ())
{
boost::system::error_code ec;
auto port (socket.local_endpoint (ec).port ());
if (ec)
if (!node.flags.disable_udp)
{
node.logger.try_log ("Unable to retrieve port: ", ec.message ());
socket = std::make_unique<boost::asio::ip::udp::socket> (node_a.io_ctx, nano::endpoint (boost::asio::ip::address_v6::any (), port_a));
boost::system::error_code ec;
auto port (socket->local_endpoint (ec).port ());
if (ec)
{
node.logger.try_log ("Unable to retrieve port: ", ec.message ());
}
local_endpoint = nano::endpoint (boost::asio::ip::address_v6::loopback (), port);
}
else
{
local_endpoint = nano::endpoint (boost::asio::ip::address_v6::loopback (), 0);
stopped = true;
}
local_endpoint = nano::endpoint (boost::asio::ip::address_v6::loopback (), port);
}
void nano::transport::udp_channels::send (nano::shared_const_buffer const & buffer_a, nano::endpoint endpoint_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a)
{
boost::asio::post (strand,
[this, buffer_a, endpoint_a, callback_a]() {
this->socket.async_send_to (buffer_a, endpoint_a,
boost::asio::bind_executor (strand, callback_a));
if (!this->stopped)
{
this->socket->async_send_to (buffer_a, endpoint_a,
boost::asio::bind_executor (strand, callback_a));
}
});
}
@ -261,43 +271,48 @@ nano::tcp_endpoint nano::transport::udp_channels::bootstrap_peer (uint8_t connec
void nano::transport::udp_channels::receive ()
{
if (node.config.logging.network_packet_logging ())
if (!stopped)
{
node.logger.try_log ("Receiving packet");
}
auto data (node.network.buffer_container.allocate ());
socket.async_receive_from (boost::asio::buffer (data->buffer, nano::network::buffer_size), data->endpoint,
boost::asio::bind_executor (strand,
[this, data](boost::system::error_code const & error, std::size_t size_a) {
if (!error && !stopped)
release_assert (socket != nullptr);
if (node.config.logging.network_packet_logging ())
{
data->size = size_a;
this->node.network.buffer_container.enqueue (data);
this->receive ();
node.logger.try_log ("Receiving packet");
}
else
{
this->node.network.buffer_container.release (data);
if (error)
auto data (node.network.buffer_container.allocate ());
socket->async_receive_from (boost::asio::buffer (data->buffer, nano::network::buffer_size), data->endpoint,
boost::asio::bind_executor (strand,
[this, data](boost::system::error_code const & error, std::size_t size_a) {
if (!error && !this->stopped)
{
if (this->node.config.logging.network_logging ())
data->size = size_a;
this->node.network.buffer_container.enqueue (data);
this->receive ();
}
else
{
this->node.network.buffer_container.release (data);
if (error)
{
this->node.logger.try_log (boost::str (boost::format ("UDP Receive error: %1%") % error.message ()));
if (this->node.config.logging.network_logging ())
{
this->node.logger.try_log (boost::str (boost::format ("UDP Receive error: %1%") % error.message ()));
}
}
if (!this->stopped)
{
this->node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() { this->receive (); });
}
}
if (!stopped)
{
this->node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() { this->receive (); });
}
}
}));
}));
}
}
void nano::transport::udp_channels::start ()
{
for (size_t i = 0; i < node.config.io_threads; ++i)
assert (!node.flags.disable_udp);
for (size_t i = 0; i < node.config.io_threads && !stopped; ++i)
{
boost::asio::post (strand, [this]() {
receive ();
@ -309,29 +324,33 @@ void nano::transport::udp_channels::start ()
void nano::transport::udp_channels::stop ()
{
// Stop and invalidate local endpoint
stopped = true;
nano::lock_guard<std::mutex> lock (mutex);
local_endpoint = nano::endpoint (boost::asio::ip::address_v6::loopback (), 0);
if (!stopped.exchange (true))
{
nano::lock_guard<std::mutex> lock (mutex);
local_endpoint = nano::endpoint (boost::asio::ip::address_v6::loopback (), 0);
// On test-net, close directly to avoid address-reuse issues. On livenet, close
// through the strand as multiple IO threads may access the socket.
if (node.network_params.network.is_test_network ())
{
this->close_socket ();
}
else
{
boost::asio::dispatch (strand, [this] {
// On test-net, close directly to avoid address-reuse issues. On livenet, close
// through the strand as multiple IO threads may access the socket.
if (node.network_params.network.is_test_network ())
{
this->close_socket ();
});
}
else
{
boost::asio::dispatch (strand, [this] {
this->close_socket ();
});
}
}
}
void nano::transport::udp_channels::close_socket ()
{
boost::system::error_code ignored;
this->socket.close (ignored);
this->local_endpoint = nano::endpoint (boost::asio::ip::address_v6::loopback (), 0);
if (this->socket != nullptr)
{
boost::system::error_code ignored;
this->socket->close (ignored);
}
}
nano::endpoint nano::transport::udp_channels::get_local_endpoint () const
@ -587,7 +606,7 @@ bool nano::transport::udp_channels::reachout (nano::endpoint const & endpoint_a)
{
// Don't overload single IP
bool error = max_ip_connections (endpoint_a);
if (!error)
if (!error && !node.flags.disable_udp)
{
auto endpoint_l (nano::transport::map_endpoint_to_v6 (endpoint_a));
// Don't keepalive to nodes that already sent us something

View file

@ -181,7 +181,7 @@ namespace transport
attempts;
// clang-format on
boost::asio::strand<boost::asio::io_context::executor_type> strand;
boost::asio::ip::udp::socket socket;
std::unique_ptr<boost::asio::ip::udp::socket> socket;
nano::endpoint local_endpoint;
std::atomic<bool> stopped{ false };
};