Move common UDP/TCP channels functions to transport (#1971)

- common tcp/udp channels functions moved to transport.hpp
- added last_packet_sent time_point
- RPC peers modified to return channel type with option "peer_details"
- rename last_tcp_attempt to last_bootstrap_attempt
- move reserved_address () to nano::transport, reachout to nano::network
- add set_last_packet_sent to send_buffer
This commit is contained in:
Sergey Kroshnin 2019-05-14 16:54:02 +03:00 committed by GitHub
commit a691823d98
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 395 additions and 263 deletions

View file

@ -73,7 +73,7 @@ TEST (network, send_node_id_handshake)
system.nodes.push_back (node1);
auto initial (system.nodes[0]->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in));
auto initial_node1 (node1->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in));
nano::transport::channel_udp channel (system.nodes[0]->network.udp_channels, node1->network.endpoint ());
auto channel (std::make_shared<nano::transport::channel_udp> (system.nodes[0]->network.udp_channels, node1->network.endpoint ()));
system.nodes[0]->network.send_keepalive (channel);
ASSERT_EQ (0, system.nodes[0]->network.size ());
ASSERT_EQ (0, node1->network.size ());
@ -106,7 +106,7 @@ TEST (network, last_contacted)
auto node1 (std::make_shared<nano::node> (init1, system.io_ctx, 24001, nano::unique_path (), system.alarm, system.logging, system.work));
node1->start ();
system.nodes.push_back (node1);
nano::transport::channel_udp channel1 (node1->network.udp_channels, nano::endpoint (boost::asio::ip::address_v6::loopback (), 24000));
auto channel1 (std::make_shared<nano::transport::channel_udp> (node1->network.udp_channels, nano::endpoint (boost::asio::ip::address_v6::loopback (), 24000)));
node1->network.send_keepalive (channel1);
system.deadline_set (10s);
@ -143,7 +143,7 @@ TEST (network, multi_keepalive)
node1->start ();
system.nodes.push_back (node1);
ASSERT_EQ (0, node1->network.size ());
nano::transport::channel_udp channel1 (node1->network.udp_channels, system.nodes[0]->network.endpoint ());
auto channel1 (std::make_shared<nano::transport::channel_udp> (node1->network.udp_channels, system.nodes[0]->network.endpoint ()));
node1->network.send_keepalive (channel1);
ASSERT_EQ (0, node1->network.size ());
ASSERT_EQ (0, system.nodes[0]->network.size ());
@ -157,7 +157,7 @@ TEST (network, multi_keepalive)
ASSERT_FALSE (init2.error ());
node2->start ();
system.nodes.push_back (node2);
nano::transport::channel_udp channel2 (node2->network.udp_channels, system.nodes[0]->network.endpoint ());
auto channel2 (std::make_shared<nano::transport::channel_udp> (node2->network.udp_channels, system.nodes[0]->network.endpoint ()));
node2->network.send_keepalive (channel2);
system.deadline_set (10s);
while (node1->network.size () != 2 || system.nodes[0]->network.size () != 2 || node2->network.size () != 2)
@ -1160,12 +1160,12 @@ TEST (network, endpoint_bad_fd)
TEST (network, reserved_address)
{
nano::system system (24000, 1);
ASSERT_FALSE (system.nodes[0]->network.udp_channels.reserved_address (nano::endpoint (boost::asio::ip::address_v6::from_string ("2001::"), 0)));
ASSERT_FALSE (nano::transport::reserved_address (nano::endpoint (boost::asio::ip::address_v6::from_string ("2001::"), 0)));
nano::endpoint loopback (boost::asio::ip::address_v6::from_string ("::1"), 1);
ASSERT_FALSE (system.nodes[0]->network.udp_channels.reserved_address (loopback));
ASSERT_FALSE (nano::transport::reserved_address (loopback));
nano::endpoint private_network_peer (boost::asio::ip::address_v6::from_string ("::ffff:10.0.0.0"), 1);
ASSERT_TRUE (system.nodes[0]->network.udp_channels.reserved_address (private_network_peer, false));
ASSERT_FALSE (system.nodes[0]->network.udp_channels.reserved_address (private_network_peer, true));
ASSERT_TRUE (nano::transport::reserved_address (private_network_peer, false));
ASSERT_FALSE (nano::transport::reserved_address (private_network_peer, true));
}
TEST (node, port_mapping)
@ -2032,7 +2032,7 @@ TEST (network, replace_port)
}
auto peers_list (system.nodes[0]->network.udp_channels.list (std::numeric_limits<size_t>::max ()));
ASSERT_EQ (peers_list[0]->get_node_id ().get (), node1->node_id.pub);
nano::transport::channel_udp channel (system.nodes[0]->network.udp_channels, node1->network.endpoint ());
auto channel (std::make_shared<nano::transport::channel_udp> (system.nodes[0]->network.udp_channels, node1->network.endpoint ()));
system.nodes[0]->network.send_keepalive (channel);
system.deadline_set (5s);
while (!system.nodes[0]->network.udp_channels.channel (node1->network.endpoint ()))

View file

@ -212,7 +212,7 @@ TEST (node, node_receive_quorum)
nano::system system2 (24001, 1);
system2.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
ASSERT_TRUE (system.nodes[0]->balance (key.pub).is_zero ());
nano::transport::channel_udp channel (system.nodes[0]->network.udp_channels, system2.nodes[0]->network.endpoint ());
auto channel (std::make_shared<nano::transport::channel_udp> (system.nodes[0]->network.udp_channels, system2.nodes[0]->network.endpoint ()));
system.nodes[0]->network.send_keepalive (channel);
while (system.nodes[0]->balance (key.pub).is_zero ())
{
@ -236,7 +236,7 @@ TEST (node, auto_bootstrap)
nano::node_init init1;
auto node1 (std::make_shared<nano::node> (init1, system.io_ctx, 24001, nano::unique_path (), system.alarm, system.logging, system.work));
ASSERT_FALSE (init1.error ());
nano::transport::channel_udp channel (node1->network.udp_channels, system.nodes[0]->network.endpoint ());
auto channel (std::make_shared<nano::transport::channel_udp> (node1->network.udp_channels, system.nodes[0]->network.endpoint ()));
node1->network.send_keepalive (channel);
node1->start ();
system.nodes.push_back (node1);
@ -267,7 +267,7 @@ TEST (node, auto_bootstrap_reverse)
auto node1 (std::make_shared<nano::node> (init1, system.io_ctx, 24001, nano::unique_path (), system.alarm, system.logging, system.work));
ASSERT_FALSE (init1.error ());
ASSERT_NE (nullptr, system.wallet (0)->send_action (nano::test_genesis_key.pub, key2.pub, system.nodes[0]->config.receive_minimum.number ()));
nano::transport::channel_udp channel (system.nodes[0]->network.udp_channels, node1->network.endpoint ());
auto channel (std::make_shared<nano::transport::channel_udp> (system.nodes[0]->network.udp_channels, node1->network.endpoint ()));
system.nodes[0]->network.send_keepalive (channel);
node1->start ();
system.nodes.push_back (node1);
@ -443,8 +443,8 @@ TEST (node, connect_after_junk)
auto node1 (std::make_shared<nano::node> (init1, system.io_ctx, 24001, nano::unique_path (), system.alarm, system.logging, system.work));
auto junk_buffer (std::make_shared<std::vector<uint8_t>> ());
junk_buffer->push_back (0);
nano::transport::channel_udp channel1 (node1->network.udp_channels, system.nodes[0]->network.endpoint ());
channel1.send_buffer (junk_buffer, nano::stat::detail::bulk_pull, [](boost::system::error_code const &, size_t) {});
auto channel1 (std::make_shared<nano::transport::channel_udp> (node1->network.udp_channels, system.nodes[0]->network.endpoint ()));
channel1->send_buffer (junk_buffer, nano::stat::detail::bulk_pull, [](boost::system::error_code const &, size_t) {});
system.deadline_set (10s);
while (system.nodes[0]->stats.count (nano::stat::type::error) == 0)
{
@ -452,7 +452,7 @@ TEST (node, connect_after_junk)
}
node1->start ();
system.nodes.push_back (node1);
nano::transport::channel_udp channel2 (node1->network.udp_channels, system.nodes[0]->network.endpoint ());
auto channel2 (std::make_shared<nano::transport::channel_udp> (node1->network.udp_channels, system.nodes[0]->network.endpoint ()));
node1->network.send_keepalive (channel2);
system.deadline_set (10s);
while (node1->network.empty ())
@ -1189,7 +1189,7 @@ TEST (node, fork_bootstrap_flip)
auto transaction (node2.store.tx_begin_read ());
ASSERT_TRUE (node2.store.block_exists (transaction, send2->hash ()));
}
nano::transport::channel_udp channel (node1.network.udp_channels, node2.network.endpoint ());
auto channel (std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, node2.network.endpoint ()));
node1.network.send_keepalive (channel);
system1.deadline_set (50s);
while (node2.network.empty ())

View file

@ -133,17 +133,17 @@ TEST (peer_container, reachout)
nano::endpoint endpoint0 (boost::asio::ip::address_v6::loopback (), 24000);
// Make sure having been contacted by them already indicates we shouldn't reach out
system.nodes[0]->network.udp_channels.insert (endpoint0, nano::protocol_version);
ASSERT_TRUE (system.nodes[0]->network.udp_channels.reachout (endpoint0));
ASSERT_TRUE (system.nodes[0]->network.reachout (endpoint0));
nano::endpoint endpoint1 (boost::asio::ip::address_v6::loopback (), 24001);
ASSERT_FALSE (system.nodes[0]->network.udp_channels.reachout (endpoint1));
ASSERT_FALSE (system.nodes[0]->network.reachout (endpoint1));
// Reaching out to them once should signal we shouldn't reach out again.
ASSERT_TRUE (system.nodes[0]->network.udp_channels.reachout (endpoint1));
ASSERT_TRUE (system.nodes[0]->network.reachout (endpoint1));
// Make sure we don't purge new items
system.nodes[0]->network.cleanup (std::chrono::steady_clock::now () - std::chrono::seconds (10));
ASSERT_TRUE (system.nodes[0]->network.udp_channels.reachout (endpoint1));
ASSERT_TRUE (system.nodes[0]->network.reachout (endpoint1));
// Make sure we purge old items
system.nodes[0]->network.cleanup (std::chrono::steady_clock::now () + std::chrono::seconds (10));
ASSERT_FALSE (system.nodes[0]->network.udp_channels.reachout (endpoint1));
ASSERT_FALSE (system.nodes[0]->network.reachout (endpoint1));
}
TEST (peer_container, depeer)

View file

@ -2481,7 +2481,7 @@ void nano::json_handler::peers ()
if (peer_details)
{
boost::property_tree::ptree pending_tree;
pending_tree.put ("protocol_version", std::to_string (channel->network_version));
pending_tree.put ("protocol_version", std::to_string (channel->get_network_version ()));
auto node_id_l (channel->get_node_id ());
if (node_id_l.is_initialized ())
{
@ -2491,11 +2491,12 @@ void nano::json_handler::peers ()
{
pending_tree.put ("node_id", "");
}
pending_tree.put ("type", channel->get_type () == nano::transport::transport_type::tcp ? "tcp" : "udp");
peers_l.push_back (boost::property_tree::ptree::value_type (text.str (), pending_tree));
}
else
{
peers_l.push_back (boost::property_tree::ptree::value_type (text.str (), boost::property_tree::ptree (std::to_string (channel->network_version))));
peers_l.push_back (boost::property_tree::ptree::value_type (text.str (), boost::property_tree::ptree (std::to_string (channel->get_network_version ()))));
}
}
response_l.add_child ("peers", peers_l);

View file

@ -95,14 +95,14 @@ void nano::network::stop ()
buffer_container.stop ();
}
void nano::network::send_keepalive (nano::transport::channel const & channel_a)
void nano::network::send_keepalive (std::shared_ptr<nano::transport::channel> channel_a)
{
nano::keepalive message;
udp_channels.random_fill (message.peers);
channel_a.send (message);
channel_a->send (message);
}
void nano::network::send_keepalive_self (nano::transport::channel const & channel_a)
void nano::network::send_keepalive_self (std::shared_ptr<nano::transport::channel> channel_a)
{
nano::keepalive message;
udp_channels.random_fill (message.peers);
@ -116,7 +116,7 @@ void nano::network::send_keepalive_self (nano::transport::channel const & channe
message.peers[1] = node.port_mapping.external_address ();
message.peers[2] = nano::endpoint (boost::asio::ip::address_v6{}, node.port_mapping.external_address ().port ()); // If UPnP reported wrong external IP address
}
channel_a.send (message);
channel_a->send (message);
}
void nano::node::keepalive (std::string const & address_a, uint16_t port_a)
@ -128,7 +128,11 @@ void nano::node::keepalive (std::string const & address_a, uint16_t port_a)
for (auto i (i_a), n (boost::asio::ip::udp::resolver::iterator{}); i != n; ++i)
{
auto endpoint (nano::transport::map_endpoint_to_v6 (i->endpoint ()));
nano::transport::channel_udp channel (node_l->network.udp_channels, endpoint);
auto channel (node_l->network.find_channel (endpoint));
if (!channel)
{
channel = std::make_shared<nano::transport::channel_udp> (node_l->network.udp_channels, endpoint);
}
node_l->network.send_keepalive (channel);
}
}
@ -139,7 +143,7 @@ void nano::node::keepalive (std::string const & address_a, uint16_t port_a)
});
}
void nano::network::send_node_id_handshake (nano::endpoint const & endpoint_a, boost::optional<nano::uint256_union> const & query, boost::optional<nano::uint256_union> const & respond_to)
void nano::network::send_node_id_handshake (std::shared_ptr<nano::transport::channel> channel_a, boost::optional<nano::uint256_union> const & query, boost::optional<nano::uint256_union> const & respond_to)
{
boost::optional<std::pair<nano::account, nano::signature>> response (boost::none);
if (respond_to)
@ -150,10 +154,9 @@ void nano::network::send_node_id_handshake (nano::endpoint const & endpoint_a, b
nano::node_id_handshake message (query, response);
if (node.config.logging.network_node_id_handshake_logging ())
{
node.logger.try_log (boost::str (boost::format ("Node ID handshake sent with node ID %1% to %2%: query %3%, respond_to %4% (signature %5%)") % node.node_id.pub.to_account () % endpoint_a % (query ? query->to_string () : std::string ("[none]")) % (respond_to ? respond_to->to_string () : std::string ("[none]")) % (response ? response->second.to_string () : std::string ("[none]"))));
node.logger.try_log (boost::str (boost::format ("Node ID handshake sent with node ID %1% to %2%: query %3%, respond_to %4% (signature %5%)") % node.node_id.pub.to_account () % channel_a->get_endpoint () % (query ? query->to_string () : std::string ("[none]")) % (respond_to ? respond_to->to_string () : std::string ("[none]")) % (response ? response->second.to_string () : std::string ("[none]"))));
}
nano::transport::channel_udp channel (udp_channels, endpoint_a);
channel.send (message);
channel_a->send (message);
}
template <typename T>
@ -175,7 +178,7 @@ bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a
auto vote_bytes = confirm.to_bytes ();
for (auto j (list_a.begin ()), m (list_a.end ()); j != m; ++j)
{
j->get ().send_buffer (vote_bytes, nano::stat::detail::confirm_ack);
j->get ()->send_buffer (vote_bytes, nano::stat::detail::confirm_ack);
}
node_a.votes_cache.add (vote);
});
@ -189,7 +192,7 @@ bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a
auto vote_bytes = confirm.to_bytes ();
for (auto j (list_a.begin ()), m (list_a.end ()); j != m; ++j)
{
j->get ().send_buffer (vote_bytes, nano::stat::detail::confirm_ack);
j->get ()->send_buffer (vote_bytes, nano::stat::detail::confirm_ack);
}
}
}
@ -200,21 +203,21 @@ bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a
auto publish_bytes (publish.to_bytes ());
for (auto j (list_a.begin ()), m (list_a.end ()); j != m; ++j)
{
j->get ().send_buffer (publish_bytes, nano::stat::detail::publish);
j->get ()->send_buffer (publish_bytes, nano::stat::detail::publish);
}
}
}
return result;
}
bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a, nano::transport::channel const & channel_a, std::shared_ptr<nano::block> block_a, bool also_publish)
bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a, std::shared_ptr<nano::transport::channel> channel_a, std::shared_ptr<nano::block> block_a, bool also_publish)
{
std::array<std::reference_wrapper<nano::transport::channel const>, 1> endpoints = { channel_a };
std::array<std::shared_ptr<nano::transport::channel>, 1> endpoints = { channel_a };
auto result (confirm_block (transaction_a, node_a, endpoints, std::move (block_a), also_publish));
return result;
}
void nano::network::confirm_hashes (nano::transaction const & transaction_a, nano::transport::channel const & channel_a, std::vector<nano::block_hash> blocks_bundle_a)
void nano::network::confirm_hashes (nano::transaction const & transaction_a, std::shared_ptr<nano::transport::channel> channel_a, std::vector<nano::block_hash> blocks_bundle_a)
{
if (node.config.enable_voting)
{
@ -226,13 +229,13 @@ void nano::network::confirm_hashes (nano::transaction const & transaction_a, nan
nano::vectorstream stream (*bytes);
confirm.serialize (stream);
}
channel_a.send_buffer (bytes, nano::stat::detail::confirm_ack);
channel_a->send_buffer (bytes, nano::stat::detail::confirm_ack);
this->node.votes_cache.add (vote);
});
}
}
bool nano::network::send_votes_cache (nano::transport::channel const & channel_a, nano::block_hash const & hash_a)
bool nano::network::send_votes_cache (std::shared_ptr<nano::transport::channel> channel_a, nano::block_hash const & hash_a)
{
// Search in cache
auto votes (node.votes_cache.find (hash_a));
@ -241,7 +244,7 @@ bool nano::network::send_votes_cache (nano::transport::channel const & channel_a
{
nano::confirm_ack confirm (vote);
auto vote_bytes = confirm.to_bytes ();
channel_a.send_buffer (vote_bytes, nano::stat::detail::confirm_ack);
channel_a->send_buffer (vote_bytes, nano::stat::detail::confirm_ack);
}
// Returns true if votes were sent
bool result (!votes.empty ());
@ -316,7 +319,8 @@ void nano::network::broadcast_confirm_req_base (std::shared_ptr<nano::block> blo
while (!endpoints_a->empty () && count < max_reps)
{
nano::confirm_req req (block_a);
endpoints_a->back ()->send (req);
auto channel (endpoints_a->back ());
channel->send (req);
endpoints_a->pop_back ();
count++;
}
@ -449,14 +453,14 @@ public:
if (message_a.block != nullptr)
{
auto hash (message_a.block->hash ());
if (!node.network.send_votes_cache (*channel, hash))
if (!node.network.send_votes_cache (channel, hash))
{
auto transaction (node.store.tx_begin_read ());
auto successor (node.ledger.successor (transaction, message_a.block->qualified_root ()));
if (successor != nullptr)
{
auto same_block (successor->hash () == hash);
confirm_block (transaction, node, std::cref (*channel), std::move (successor), !same_block);
confirm_block (transaction, node, channel, std::move (successor), !same_block);
}
}
}
@ -466,7 +470,7 @@ public:
std::vector<nano::block_hash> blocks_bundle;
for (auto & root_hash : message_a.roots_hashes)
{
if (!node.network.send_votes_cache (*channel, root_hash.first) && node.store.block_exists (transaction, root_hash.first))
if (!node.network.send_votes_cache (channel, root_hash.first) && node.store.block_exists (transaction, root_hash.first))
{
blocks_bundle.push_back (root_hash.first);
}
@ -485,7 +489,7 @@ public:
}
if (!successor.is_zero ())
{
if (!node.network.send_votes_cache (*channel, successor))
if (!node.network.send_votes_cache (channel, successor))
{
blocks_bundle.push_back (successor);
}
@ -498,7 +502,7 @@ public:
}
if (!blocks_bundle.empty ())
{
node.network.confirm_hashes (transaction, *channel, blocks_bundle);
node.network.confirm_hashes (transaction, channel, blocks_bundle);
}
}
}
@ -560,13 +564,53 @@ void nano::network::merge_peers (std::array<nano::endpoint, 8> const & peers_a)
void nano::network::merge_peer (nano::endpoint const & peer_a)
{
if (!udp_channels.reachout (peer_a, node.config.allow_local_peers))
if (!reachout (peer_a, node.config.allow_local_peers))
{
nano::transport::channel_udp channel (node.network.udp_channels, peer_a);
auto channel (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, peer_a));
send_keepalive (channel);
}
}
bool nano::network::not_a_peer (nano::endpoint const & endpoint_a, bool allow_local_peers)
{
bool result (false);
if (endpoint_a.address ().to_v6 ().is_unspecified ())
{
result = true;
}
else if (nano::transport::reserved_address (endpoint_a, allow_local_peers))
{
result = true;
}
else if (endpoint_a == endpoint ())
{
result = true;
}
else if (!node.network_params.network.is_test_network ())
{
result = true;
}
return result;
}
bool nano::network::reachout (nano::endpoint const & endpoint_a, bool allow_local_peers)
{
// Don't contact invalid IPs
bool error = not_a_peer (endpoint_a, allow_local_peers);
if (!error)
{
error |= udp_channels.reachout (endpoint_a);
}
return error;
}
std::shared_ptr<nano::transport::channel> nano::network::find_channel (nano::endpoint const & endpoint_a)
{
std::shared_ptr<nano::transport::channel> result;
result = udp_channels.channel (endpoint_a);
return result;
}
bool nano::operation::operator> (nano::operation const & other_a) const
{
return wakeup > other_a.wakeup;
@ -1169,7 +1213,7 @@ startup_time (std::chrono::steady_clock::now ())
});
}
observers.endpoint.add ([this](std::shared_ptr<nano::transport::channel> channel_a) {
this->network.send_keepalive (*channel_a);
this->network.send_keepalive (channel_a);
});
observers.vote.add ([this](nano::transaction const & transaction, std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> channel_a) {
this->gap_cache.vote (vote_a);
@ -2190,10 +2234,10 @@ void nano::node::add_initial_peers ()
for (auto i (store.peers_begin (transaction)), n (store.peers_end ()); i != n; ++i)
{
nano::endpoint endpoint (boost::asio::ip::address_v6 (i->first.address_bytes ()), i->first.port ());
if (!network.udp_channels.reachout (endpoint, config.allow_local_peers))
if (!network.reachout (endpoint, config.allow_local_peers))
{
auto channel (std::make_shared<nano::transport::channel_udp> (network.udp_channels, endpoint));
network.send_keepalive (*channel);
network.send_keepalive (channel);
rep_crawler.query (channel);
}
}

View file

@ -253,15 +253,19 @@ public:
void flood_block_batch (std::deque<std::shared_ptr<nano::block>>, unsigned = broadcast_interval_ms);
void merge_peers (std::array<nano::endpoint, 8> const &);
void merge_peer (nano::endpoint const &);
void send_keepalive (nano::transport::channel const &);
void send_keepalive_self (nano::transport::channel const &);
void send_node_id_handshake (nano::endpoint const &, boost::optional<nano::uint256_union> const & query, boost::optional<nano::uint256_union> const & respond_to);
void send_keepalive (std::shared_ptr<nano::transport::channel>);
void send_keepalive_self (std::shared_ptr<nano::transport::channel>);
void send_node_id_handshake (std::shared_ptr<nano::transport::channel>, boost::optional<nano::uint256_union> const & query, boost::optional<nano::uint256_union> const & respond_to);
void broadcast_confirm_req (std::shared_ptr<nano::block>);
void broadcast_confirm_req_base (std::shared_ptr<nano::block>, std::shared_ptr<std::vector<std::shared_ptr<nano::transport::channel>>>, unsigned, bool = false);
void broadcast_confirm_req_batch (std::unordered_map<std::shared_ptr<nano::transport::channel>, std::vector<std::pair<nano::block_hash, nano::block_hash>>>, unsigned = broadcast_interval_ms, bool = false);
void broadcast_confirm_req_batch (std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<std::shared_ptr<nano::transport::channel>>>>>, unsigned = broadcast_interval_ms);
void confirm_hashes (nano::transaction const &, nano::transport::channel const &, std::vector<nano::block_hash>);
bool send_votes_cache (nano::transport::channel const &, nano::block_hash const &);
void confirm_hashes (nano::transaction const &, std::shared_ptr<nano::transport::channel>, std::vector<nano::block_hash>);
bool send_votes_cache (std::shared_ptr<nano::transport::channel>, nano::block_hash const &);
std::shared_ptr<nano::transport::channel> find_channel (nano::endpoint const &);
bool not_a_peer (nano::endpoint const &, bool);
// Should we reach out to this endpoint with a keepalive message
bool reachout (nano::endpoint const &, bool = false);
nano::endpoint endpoint ();
void cleanup (std::chrono::steady_clock::time_point const &);
void ongoing_cleanup ();

View file

@ -43,7 +43,7 @@ std::shared_ptr<nano::node> nano::system::add_node (nano::node_config const & no
decltype (starting1) new1;
auto starting2 (node2->network.size ());
decltype (starting2) new2;
nano::transport::channel_udp channel ((*j)->network.udp_channels, (*i)->network.endpoint ());
auto channel (std::make_shared<nano::transport::channel_udp> ((*j)->network.udp_channels, (*i)->network.endpoint ()));
(*j)->network.send_keepalive (channel);
do
{

View file

@ -7,6 +7,15 @@ socket (socket_a)
{
}
nano::transport::channel_tcp::~channel_tcp ()
{
std::lock_guard<std::mutex> lk (channel_mutex);
if (socket)
{
socket->close ();
}
}
size_t nano::transport::channel_tcp::hash_code () const
{
std::hash<::nano::tcp_endpoint> hash;
@ -24,8 +33,9 @@ bool nano::transport::channel_tcp::operator== (nano::transport::channel const &
return result;
}
void nano::transport::channel_tcp::send_buffer (std::shared_ptr<std::vector<uint8_t>> buffer_a, nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
void nano::transport::channel_tcp::send_buffer (std::shared_ptr<std::vector<uint8_t>> buffer_a, nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a)
{
set_last_packet_sent (std::chrono::steady_clock::now ());
socket->async_write (buffer_a, callback (buffer_a, detail_a, callback_a));
}
@ -51,5 +61,5 @@ std::function<void(boost::system::error_code const &, size_t)> nano::transport::
std::string nano::transport::channel_tcp::to_string () const
{
return boost::str (boost::format ("TCP: %1%") % socket->remote_endpoint ());
return boost::str (boost::format ("%1%") % socket->remote_endpoint ());
}

View file

@ -12,9 +12,10 @@ namespace transport
{
public:
channel_tcp (nano::node &, std::shared_ptr<nano::socket>);
~channel_tcp ();
size_t hash_code () const override;
bool operator== (nano::transport::channel const &) const override;
void send_buffer (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const override;
void send_buffer (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) override;
std::function<void(boost::system::error_code const &, size_t)> callback (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const override;
std::string to_string () const override;
bool operator== (nano::transport::channel_tcp const & other_a) const
@ -22,6 +23,37 @@ namespace transport
return &node == &other_a.node && socket == other_a.socket;
}
std::shared_ptr<nano::socket> socket;
nano::endpoint get_endpoint () const override
{
std::lock_guard<std::mutex> lk (channel_mutex);
if (socket)
{
return nano::transport::map_tcp_to_endpoint (socket->remote_endpoint ());
}
else
{
return nano::endpoint (boost::asio::ip::address_v6::any (), 0);
}
}
nano::tcp_endpoint get_tcp_endpoint () const override
{
std::lock_guard<std::mutex> lk (channel_mutex);
if (socket)
{
return socket->remote_endpoint ();
}
else
{
return nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0);
}
}
nano::transport::transport_type get_type () const override
{
return nano::transport::transport_type::tcp;
}
};
} // namespace transport
} // namespace nano

View file

@ -57,12 +57,22 @@ nano::endpoint nano::transport::map_endpoint_to_v6 (nano::endpoint const & endpo
return endpoint_l;
}
nano::endpoint nano::transport::map_tcp_to_endpoint (nano::tcp_endpoint const & endpoint_a)
{
return nano::endpoint (endpoint_a.address (), endpoint_a.port ());
}
nano::tcp_endpoint nano::transport::map_endpoint_to_tcp (nano::endpoint const & endpoint_a)
{
return nano::tcp_endpoint (endpoint_a.address (), endpoint_a.port ());
}
nano::transport::channel::channel (nano::node & node_a) :
node (node_a)
{
}
void nano::transport::channel::send (nano::message const & message_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
void nano::transport::channel::send (nano::message const & message_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a)
{
callback_visitor visitor;
message_a.visit (visitor);
@ -71,3 +81,106 @@ void nano::transport::channel::send (nano::message const & message_a, std::funct
send_buffer (buffer, detail, callback_a);
node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out);
}
namespace
{
boost::asio::ip::address_v6 mapped_from_v4_bytes (unsigned long address_a)
{
return boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (address_a));
}
}
bool nano::transport::reserved_address (nano::endpoint const & endpoint_a, bool allow_local_peers)
{
assert (endpoint_a.address ().is_v6 ());
auto bytes (endpoint_a.address ().to_v6 ());
auto result (false);
static auto const rfc1700_min (mapped_from_v4_bytes (0x00000000ul));
static auto const rfc1700_max (mapped_from_v4_bytes (0x00fffffful));
static auto const rfc1918_1_min (mapped_from_v4_bytes (0x0a000000ul));
static auto const rfc1918_1_max (mapped_from_v4_bytes (0x0afffffful));
static auto const rfc1918_2_min (mapped_from_v4_bytes (0xac100000ul));
static auto const rfc1918_2_max (mapped_from_v4_bytes (0xac1ffffful));
static auto const rfc1918_3_min (mapped_from_v4_bytes (0xc0a80000ul));
static auto const rfc1918_3_max (mapped_from_v4_bytes (0xc0a8fffful));
static auto const rfc6598_min (mapped_from_v4_bytes (0x64400000ul));
static auto const rfc6598_max (mapped_from_v4_bytes (0x647ffffful));
static auto const rfc5737_1_min (mapped_from_v4_bytes (0xc0000200ul));
static auto const rfc5737_1_max (mapped_from_v4_bytes (0xc00002fful));
static auto const rfc5737_2_min (mapped_from_v4_bytes (0xc6336400ul));
static auto const rfc5737_2_max (mapped_from_v4_bytes (0xc63364fful));
static auto const rfc5737_3_min (mapped_from_v4_bytes (0xcb007100ul));
static auto const rfc5737_3_max (mapped_from_v4_bytes (0xcb0071fful));
static auto const ipv4_multicast_min (mapped_from_v4_bytes (0xe0000000ul));
static auto const ipv4_multicast_max (mapped_from_v4_bytes (0xeffffffful));
static auto const rfc6890_min (mapped_from_v4_bytes (0xf0000000ul));
static auto const rfc6890_max (mapped_from_v4_bytes (0xfffffffful));
static auto const rfc6666_min (boost::asio::ip::address_v6::from_string ("100::"));
static auto const rfc6666_max (boost::asio::ip::address_v6::from_string ("100::ffff:ffff:ffff:ffff"));
static auto const rfc3849_min (boost::asio::ip::address_v6::from_string ("2001:db8::"));
static auto const rfc3849_max (boost::asio::ip::address_v6::from_string ("2001:db8:ffff:ffff:ffff:ffff:ffff:ffff"));
static auto const rfc4193_min (boost::asio::ip::address_v6::from_string ("fc00::"));
static auto const rfc4193_max (boost::asio::ip::address_v6::from_string ("fd00:ffff:ffff:ffff:ffff:ffff:ffff:ffff"));
static auto const ipv6_multicast_min (boost::asio::ip::address_v6::from_string ("ff00::"));
static auto const ipv6_multicast_max (boost::asio::ip::address_v6::from_string ("ff00:ffff:ffff:ffff:ffff:ffff:ffff:ffff"));
if (bytes >= rfc1700_min && bytes <= rfc1700_max)
{
result = true;
}
else if (bytes >= rfc5737_1_min && bytes <= rfc5737_1_max)
{
result = true;
}
else if (bytes >= rfc5737_2_min && bytes <= rfc5737_2_max)
{
result = true;
}
else if (bytes >= rfc5737_3_min && bytes <= rfc5737_3_max)
{
result = true;
}
else if (bytes >= ipv4_multicast_min && bytes <= ipv4_multicast_max)
{
result = true;
}
else if (bytes >= rfc6890_min && bytes <= rfc6890_max)
{
result = true;
}
else if (bytes >= rfc6666_min && bytes <= rfc6666_max)
{
result = true;
}
else if (bytes >= rfc3849_min && bytes <= rfc3849_max)
{
result = true;
}
else if (bytes >= ipv6_multicast_min && bytes <= ipv6_multicast_max)
{
result = true;
}
else if (!allow_local_peers)
{
if (bytes >= rfc1918_1_min && bytes <= rfc1918_1_max)
{
result = true;
}
else if (bytes >= rfc1918_2_min && bytes <= rfc1918_2_max)
{
result = true;
}
else if (bytes >= rfc1918_3_min && bytes <= rfc1918_3_max)
{
result = true;
}
else if (bytes >= rfc6598_min && bytes <= rfc6598_max)
{
result = true;
}
else if (bytes >= rfc4193_min && bytes <= rfc4193_max)
{
result = true;
}
}
return result;
}

View file

@ -11,6 +11,16 @@ namespace transport
{
class message;
nano::endpoint map_endpoint_to_v6 (nano::endpoint const &);
nano::endpoint map_tcp_to_endpoint (nano::tcp_endpoint const &);
nano::tcp_endpoint map_endpoint_to_tcp (nano::endpoint const &);
// Unassigned, reserved, self
bool reserved_address (nano::endpoint const &, bool = false);
enum class transport_type : uint8_t
{
undefined = 0,
udp = 1,
tcp = 2
};
class channel
{
public:
@ -18,10 +28,80 @@ namespace transport
virtual ~channel () = default;
virtual size_t hash_code () const = 0;
virtual bool operator== (nano::transport::channel const &) const = 0;
void send (nano::message const &, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const;
virtual void send_buffer (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const = 0;
void send (nano::message const &, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr);
virtual void send_buffer (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) = 0;
virtual std::function<void(boost::system::error_code const &, size_t)> callback (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const = 0;
virtual std::string to_string () const = 0;
virtual nano::endpoint get_endpoint () const = 0;
virtual nano::tcp_endpoint get_tcp_endpoint () const = 0;
virtual nano::transport::transport_type get_type () const = 0;
std::chrono::steady_clock::time_point get_last_bootstrap_attempt () const
{
std::lock_guard<std::mutex> lk (channel_mutex);
return last_bootstrap_attempt;
}
void set_last_bootstrap_attempt (std::chrono::steady_clock::time_point const time_a)
{
std::lock_guard<std::mutex> lk (channel_mutex);
last_bootstrap_attempt = time_a;
}
std::chrono::steady_clock::time_point get_last_packet_received () const
{
std::lock_guard<std::mutex> lk (channel_mutex);
return last_packet_received;
}
void set_last_packet_received (std::chrono::steady_clock::time_point const time_a)
{
std::lock_guard<std::mutex> lk (channel_mutex);
last_packet_received = time_a;
}
std::chrono::steady_clock::time_point get_last_packet_sent () const
{
std::lock_guard<std::mutex> lk (channel_mutex);
return last_packet_sent;
}
void set_last_packet_sent (std::chrono::steady_clock::time_point const time_a)
{
std::lock_guard<std::mutex> lk (channel_mutex);
last_packet_sent = time_a;
}
boost::optional<nano::account> get_node_id () const
{
std::lock_guard<std::mutex> lk (channel_mutex);
return node_id;
}
void set_node_id (nano::account node_id_a)
{
std::lock_guard<std::mutex> lk (channel_mutex);
node_id = node_id_a;
}
unsigned get_network_version () const
{
return network_version;
}
void set_network_version (unsigned network_version_a)
{
network_version = network_version_a;
}
mutable std::mutex channel_mutex;
private:
std::chrono::steady_clock::time_point last_bootstrap_attempt{ std::chrono::steady_clock::time_point () };
std::chrono::steady_clock::time_point last_packet_received{ std::chrono::steady_clock::time_point () };
std::chrono::steady_clock::time_point last_packet_sent{ std::chrono::steady_clock::time_point () };
boost::optional<nano::account> node_id{ boost::none };
std::atomic<unsigned> network_version{ nano::protocol_version };
protected:
nano::node & node;

View file

@ -6,10 +6,10 @@ std::chrono::seconds constexpr nano::transport::udp_channels::syn_cookie_cutoff;
nano::transport::channel_udp::channel_udp (nano::transport::udp_channels & channels_a, nano::endpoint const & endpoint_a, unsigned network_version_a) :
channel (channels_a.node),
network_version (network_version_a),
endpoint (endpoint_a),
channels (channels_a)
{
set_network_version (network_version_a);
assert (endpoint_a.address ().is_v6 ());
}
@ -30,8 +30,9 @@ bool nano::transport::channel_udp::operator== (nano::transport::channel const &
return result;
}
void nano::transport::channel_udp::send_buffer (std::shared_ptr<std::vector<uint8_t>> buffer_a, nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
void nano::transport::channel_udp::send_buffer (std::shared_ptr<std::vector<uint8_t>> buffer_a, nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a)
{
set_last_packet_sent (std::chrono::steady_clock::now ());
channels.send (boost::asio::const_buffer (buffer_a->data (), buffer_a->size ()), endpoint, callback (buffer_a, detail_a, callback_a));
}
@ -62,7 +63,7 @@ std::function<void(boost::system::error_code const &, size_t)> nano::transport::
std::string nano::transport::channel_udp::to_string () const
{
return boost::str (boost::format ("UDP: %1%") % endpoint);
return boost::str (boost::format ("%1%") % endpoint);
}
nano::transport::udp_channels::udp_channels (nano::node & node_a, uint16_t port_a) :
@ -93,7 +94,7 @@ std::shared_ptr<nano::transport::channel_udp> nano::transport::udp_channels::ins
{
assert (endpoint_a.address ().is_v6 ());
std::shared_ptr<nano::transport::channel_udp> result;
if (!not_a_peer (endpoint_a, node.config.allow_local_peers))
if (!node.network.not_a_peer (endpoint_a, node.config.allow_local_peers))
{
std::unique_lock<std::mutex> lock (mutex);
auto existing (channels.get<endpoint_tag> ().find (endpoint_a));
@ -197,109 +198,6 @@ void nano::transport::udp_channels::store_all (nano::node & node_a)
}
}
namespace
{
boost::asio::ip::address_v6 mapped_from_v4_bytes (unsigned long address_a)
{
return boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (address_a));
}
}
bool nano::transport::udp_channels::reserved_address (nano::endpoint const & endpoint_a, bool allow_local_peers)
{
assert (endpoint_a.address ().is_v6 ());
auto bytes (endpoint_a.address ().to_v6 ());
auto result (false);
static auto const rfc1700_min (mapped_from_v4_bytes (0x00000000ul));
static auto const rfc1700_max (mapped_from_v4_bytes (0x00fffffful));
static auto const rfc1918_1_min (mapped_from_v4_bytes (0x0a000000ul));
static auto const rfc1918_1_max (mapped_from_v4_bytes (0x0afffffful));
static auto const rfc1918_2_min (mapped_from_v4_bytes (0xac100000ul));
static auto const rfc1918_2_max (mapped_from_v4_bytes (0xac1ffffful));
static auto const rfc1918_3_min (mapped_from_v4_bytes (0xc0a80000ul));
static auto const rfc1918_3_max (mapped_from_v4_bytes (0xc0a8fffful));
static auto const rfc6598_min (mapped_from_v4_bytes (0x64400000ul));
static auto const rfc6598_max (mapped_from_v4_bytes (0x647ffffful));
static auto const rfc5737_1_min (mapped_from_v4_bytes (0xc0000200ul));
static auto const rfc5737_1_max (mapped_from_v4_bytes (0xc00002fful));
static auto const rfc5737_2_min (mapped_from_v4_bytes (0xc6336400ul));
static auto const rfc5737_2_max (mapped_from_v4_bytes (0xc63364fful));
static auto const rfc5737_3_min (mapped_from_v4_bytes (0xcb007100ul));
static auto const rfc5737_3_max (mapped_from_v4_bytes (0xcb0071fful));
static auto const ipv4_multicast_min (mapped_from_v4_bytes (0xe0000000ul));
static auto const ipv4_multicast_max (mapped_from_v4_bytes (0xeffffffful));
static auto const rfc6890_min (mapped_from_v4_bytes (0xf0000000ul));
static auto const rfc6890_max (mapped_from_v4_bytes (0xfffffffful));
static auto const rfc6666_min (boost::asio::ip::address_v6::from_string ("100::"));
static auto const rfc6666_max (boost::asio::ip::address_v6::from_string ("100::ffff:ffff:ffff:ffff"));
static auto const rfc3849_min (boost::asio::ip::address_v6::from_string ("2001:db8::"));
static auto const rfc3849_max (boost::asio::ip::address_v6::from_string ("2001:db8:ffff:ffff:ffff:ffff:ffff:ffff"));
static auto const rfc4193_min (boost::asio::ip::address_v6::from_string ("fc00::"));
static auto const rfc4193_max (boost::asio::ip::address_v6::from_string ("fd00:ffff:ffff:ffff:ffff:ffff:ffff:ffff"));
static auto const ipv6_multicast_min (boost::asio::ip::address_v6::from_string ("ff00::"));
static auto const ipv6_multicast_max (boost::asio::ip::address_v6::from_string ("ff00:ffff:ffff:ffff:ffff:ffff:ffff:ffff"));
if (bytes >= rfc1700_min && bytes <= rfc1700_max)
{
result = true;
}
else if (bytes >= rfc5737_1_min && bytes <= rfc5737_1_max)
{
result = true;
}
else if (bytes >= rfc5737_2_min && bytes <= rfc5737_2_max)
{
result = true;
}
else if (bytes >= rfc5737_3_min && bytes <= rfc5737_3_max)
{
result = true;
}
else if (bytes >= ipv4_multicast_min && bytes <= ipv4_multicast_max)
{
result = true;
}
else if (bytes >= rfc6890_min && bytes <= rfc6890_max)
{
result = true;
}
else if (bytes >= rfc6666_min && bytes <= rfc6666_max)
{
result = true;
}
else if (bytes >= rfc3849_min && bytes <= rfc3849_max)
{
result = true;
}
else if (bytes >= ipv6_multicast_min && bytes <= ipv6_multicast_max)
{
result = true;
}
else if (!allow_local_peers)
{
if (bytes >= rfc1918_1_min && bytes <= rfc1918_1_max)
{
result = true;
}
else if (bytes >= rfc1918_2_min && bytes <= rfc1918_2_max)
{
result = true;
}
else if (bytes >= rfc1918_3_min && bytes <= rfc1918_3_max)
{
result = true;
}
else if (bytes >= rfc6598_min && bytes <= rfc6598_max)
{
result = true;
}
else if (bytes >= rfc4193_min && bytes <= rfc4193_max)
{
result = true;
}
}
return result;
}
void nano::transport::udp_channels::clean_node_id (nano::endpoint const & endpoint_a, nano::account const & node_id_a)
{
std::lock_guard<std::mutex> lock (mutex);
@ -319,13 +217,13 @@ nano::endpoint nano::transport::udp_channels::tcp_peer ()
{
nano::endpoint result (boost::asio::ip::address_v6::any (), 0);
std::lock_guard<std::mutex> lock (mutex);
for (auto i (channels.get<last_tcp_attempt_tag> ().begin ()), n (channels.get<last_tcp_attempt_tag> ().end ()); i != n;)
for (auto i (channels.get<last_bootstrap_attempt_tag> ().begin ()), n (channels.get<last_bootstrap_attempt_tag> ().end ()); i != n;)
{
if (i->channel->network_version >= protocol_version_reasonable_min)
if (i->channel->get_network_version () >= protocol_version_reasonable_min)
{
result = i->endpoint ();
channels.get<last_tcp_attempt_tag> ().modify (i, [](channel_udp_wrapper & wrapper_a) {
wrapper_a.channel->set_last_tcp_attempt (std::chrono::steady_clock::now ());
channels.get<last_bootstrap_attempt_tag> ().modify (i, [](channel_udp_wrapper & wrapper_a) {
wrapper_a.channel->set_last_bootstrap_attempt (std::chrono::steady_clock::now ());
});
i = n;
}
@ -439,11 +337,16 @@ public:
if (cookie)
{
// New connection
node.network.send_node_id_handshake (endpoint, *cookie, boost::none);
auto channel (node.network.udp_channels.channel (endpoint));
if (channel)
{
node.network.send_keepalive_self (*channel);
node.network.send_node_id_handshake (channel, *cookie, boost::none);
node.network.send_keepalive_self (channel);
}
else
{
channel = std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, endpoint);
node.network.send_node_id_handshake (channel, *cookie, boost::none);
}
}
// Check for special node port data
@ -507,10 +410,10 @@ public:
if (message_a.response->first != node.node_id.pub)
{
node.network.udp_channels.clean_node_id (endpoint, message_a.response->first);
auto channel (node.network.udp_channels.insert (endpoint, message_a.header.version_using));
if (channel)
auto new_channel (node.network.udp_channels.insert (endpoint, message_a.header.version_using));
if (new_channel)
{
node.network.udp_channels.modify (channel, [&message_a](std::shared_ptr<nano::transport::channel_udp> channel_a) {
node.network.udp_channels.modify (new_channel, [&message_a](std::shared_ptr<nano::transport::channel_udp> channel_a) {
channel_a->set_node_id (message_a.response->first);
});
}
@ -527,7 +430,12 @@ public:
}
if (out_query || out_respond_to)
{
node.network.send_node_id_handshake (endpoint, out_query, out_respond_to);
auto channel (node.network.find_channel (endpoint));
if (!channel)
{
channel = std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, endpoint);
}
node.network.send_node_id_handshake (channel, out_query, out_respond_to);
}
message (message_a);
}
@ -544,7 +452,6 @@ public:
}
nano::node & node;
nano::endpoint endpoint;
std::shared_ptr<nano::transport::channel_udp> channel;
};
}
@ -555,7 +462,7 @@ void nano::transport::udp_channels::receive_action (nano::message_buffer * data_
{
allowed_sender = false;
}
else if (reserved_address (data_a->endpoint, node.config.allow_local_peers))
else if (nano::transport::reserved_address (data_a->endpoint, node.config.allow_local_peers))
{
allowed_sender = false;
}
@ -644,28 +551,6 @@ std::shared_ptr<nano::transport::channel> nano::transport::udp_channels::create
return std::make_shared<nano::transport::channel_udp> (*this, endpoint_a);
}
bool nano::transport::udp_channels::not_a_peer (nano::endpoint const & endpoint_a, bool allow_local_peers)
{
bool result (false);
if (endpoint_a.address ().to_v6 ().is_unspecified ())
{
result = true;
}
else if (reserved_address (endpoint_a, allow_local_peers))
{
result = true;
}
else if (endpoint_a == get_local_endpoint ())
{
result = true;
}
else if (!network_params.network.is_test_network () && max_ip_connections (endpoint_a))
{
result = true;
}
return result;
}
bool nano::transport::udp_channels::max_ip_connections (nano::endpoint const & endpoint_a)
{
std::unique_lock<std::mutex> lock (mutex);
@ -673,16 +558,15 @@ bool nano::transport::udp_channels::max_ip_connections (nano::endpoint const & e
return result;
}
bool nano::transport::udp_channels::reachout (nano::endpoint const & endpoint_a, bool allow_local_peers)
bool nano::transport::udp_channels::reachout (nano::endpoint const & endpoint_a)
{
// Don't contact invalid IPs
bool error = node.network.udp_channels.not_a_peer (endpoint_a, allow_local_peers);
// Don't overload single IP
bool error = max_ip_connections (endpoint_a);
if (!error)
{
auto endpoint_l (nano::transport::map_endpoint_to_v6 (endpoint_a));
// Don't keepalive to nodes that already sent us something
nano::transport::channel_udp channel (node.network.udp_channels, endpoint_l);
error |= node.network.udp_channels.channel (endpoint_l) != nullptr;
error |= channel (endpoint_l) != nullptr;
std::lock_guard<std::mutex> lock (mutex);
auto existing (attempts.find (endpoint_l));
error |= existing != attempts.end ();

View file

@ -19,7 +19,7 @@ namespace transport
channel_udp (nano::transport::udp_channels &, nano::endpoint const &, unsigned = nano::protocol_version);
size_t hash_code () const override;
bool operator== (nano::transport::channel const &) const override;
void send_buffer (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const override;
void send_buffer (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) override;
std::function<void(boost::system::error_code const &, size_t)> callback (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const override;
std::string to_string () const override;
bool operator== (nano::transport::channel_udp const & other_a) const
@ -27,58 +27,25 @@ namespace transport
return &channels == &other_a.channels && endpoint == other_a.endpoint;
}
nano::endpoint get_endpoint () const
nano::endpoint get_endpoint () const override
{
std::lock_guard<std::mutex> lk (channel_mutex);
return endpoint;
}
std::chrono::steady_clock::time_point get_last_tcp_attempt () const
nano::tcp_endpoint get_tcp_endpoint () const override
{
std::lock_guard<std::mutex> lk (channel_mutex);
return last_tcp_attempt;
return nano::transport::map_endpoint_to_tcp (endpoint);
}
void set_last_tcp_attempt (std::chrono::steady_clock::time_point const time_a)
nano::transport::transport_type get_type () const override
{
std::lock_guard<std::mutex> lk (channel_mutex);
last_tcp_attempt = time_a;
return nano::transport::transport_type::udp;
}
std::chrono::steady_clock::time_point get_last_packet_received () const
{
std::lock_guard<std::mutex> lk (channel_mutex);
return last_packet_received;
}
void set_last_packet_received (std::chrono::steady_clock::time_point const time_a)
{
std::lock_guard<std::mutex> lk (channel_mutex);
last_packet_received = time_a;
}
boost::optional<nano::account> get_node_id () const
{
std::lock_guard<std::mutex> lk (channel_mutex);
return node_id;
}
void set_node_id (nano::account node_id_a)
{
std::lock_guard<std::mutex> lk (channel_mutex);
node_id = node_id_a;
}
unsigned network_version{ nano::protocol_version };
private:
mutable std::mutex channel_mutex;
nano::endpoint endpoint;
std::chrono::steady_clock::time_point last_tcp_attempt{ std::chrono::steady_clock::time_point () };
std::chrono::steady_clock::time_point last_packet_received{ std::chrono::steady_clock::time_point () };
boost::optional<nano::account> node_id{ boost::none };
private:
nano::transport::udp_channels & channels;
};
class udp_channels final
@ -94,7 +61,6 @@ namespace transport
void random_fill (std::array<nano::endpoint, 8> &) const;
std::unordered_set<std::shared_ptr<nano::transport::channel_udp>> random_set (size_t) const;
void store_all (nano::node &);
bool reserved_address (nano::endpoint const &, bool = false);
void clean_node_id (nano::endpoint const &, nano::account const &);
// Get the next peer for attempting a tcp connection
nano::endpoint tcp_peer ();
@ -106,11 +72,9 @@ namespace transport
void receive_action (nano::message_buffer *);
void process_packets ();
std::shared_ptr<nano::transport::channel> create (nano::endpoint const &);
// Unassigned, reserved, self
bool not_a_peer (nano::endpoint const &, bool);
bool max_ip_connections (nano::endpoint const &);
// Should we reach out to this endpoint with a keepalive message
bool reachout (nano::endpoint const &, bool = false);
bool reachout (nano::endpoint const &);
std::unique_ptr<seq_con_info_component> collect_seq_con_info (std::string const &);
void purge (std::chrono::steady_clock::time_point const &);
void purge_syn_cookies (std::chrono::steady_clock::time_point const &);
@ -144,7 +108,7 @@ namespace transport
class last_packet_received_tag
{
};
class last_tcp_attempt_tag
class last_bootstrap_attempt_tag
{
};
class node_id_tag
@ -162,9 +126,9 @@ namespace transport
{
return channel->get_last_packet_received ();
}
std::chrono::steady_clock::time_point last_tcp_attempt () const
std::chrono::steady_clock::time_point last_bootstrap_attempt () const
{
return channel->get_last_tcp_attempt ();
return channel->get_last_bootstrap_attempt ();
}
boost::asio::ip::address ip_address () const
{
@ -200,7 +164,7 @@ namespace transport
channel_udp_wrapper,
boost::multi_index::indexed_by<
boost::multi_index::random_access<boost::multi_index::tag<random_access_tag>>,
boost::multi_index::ordered_non_unique<boost::multi_index::tag<last_tcp_attempt_tag>, boost::multi_index::const_mem_fun<channel_udp_wrapper, std::chrono::steady_clock::time_point, &channel_udp_wrapper::last_tcp_attempt>>,
boost::multi_index::ordered_non_unique<boost::multi_index::tag<last_bootstrap_attempt_tag>, boost::multi_index::const_mem_fun<channel_udp_wrapper, std::chrono::steady_clock::time_point, &channel_udp_wrapper::last_bootstrap_attempt>>,
boost::multi_index::hashed_unique<boost::multi_index::tag<endpoint_tag>, boost::multi_index::const_mem_fun<channel_udp_wrapper, nano::endpoint, &channel_udp_wrapper::endpoint>>,
boost::multi_index::hashed_non_unique<boost::multi_index::tag<node_id_tag>, boost::multi_index::const_mem_fun<channel_udp_wrapper, nano::account, &channel_udp_wrapper::node_id>>,
boost::multi_index::ordered_non_unique<boost::multi_index::tag<last_packet_received_tag>, boost::multi_index::const_mem_fun<channel_udp_wrapper, std::chrono::steady_clock::time_point, &channel_udp_wrapper::last_packet_received>>,

View file

@ -1939,7 +1939,7 @@ void nano_qt::advanced_actions::refresh_peers ()
QList<QStandardItem *> items;
items.push_back (new QStandardItem (qendpoint));
auto version = new QStandardItem ();
version->setData (QVariant (channel->network_version), Qt::DisplayRole);
version->setData (QVariant (channel->get_network_version ()), Qt::DisplayRole);
items.push_back (version);
QString node_id ("");
auto node_id_l (channel->get_node_id ());

View file

@ -2159,11 +2159,11 @@ TEST (rpc, peers)
ASSERT_EQ (200, response.status);
auto & peers_node (response.json.get_child ("peers"));
ASSERT_EQ (2, peers_node.size ());
ASSERT_EQ (std::to_string (nano::protocol_version), peers_node.get<std::string> ("UDP: [::1]:24001"));
ASSERT_EQ (std::to_string (nano::protocol_version), peers_node.get<std::string> ("[::1]:24001"));
// Previously "[::ffff:80.80.80.80]:4000", but IPv4 address cause "No such node thrown in the test body" issue with peers_node.get
std::stringstream endpoint_text;
endpoint_text << endpoint;
ASSERT_EQ (std::to_string (nano::protocol_version), peers_node.get<std::string> ("UDP: " + endpoint_text.str ()));
ASSERT_EQ (std::to_string (nano::protocol_version), peers_node.get<std::string> (endpoint_text.str ()));
}
TEST (rpc, peers_node_id)
@ -2191,12 +2191,12 @@ TEST (rpc, peers_node_id)
ASSERT_EQ (200, response.status);
auto & peers_node (response.json.get_child ("peers"));
ASSERT_EQ (2, peers_node.size ());
auto tree1 (peers_node.get_child ("UDP: [::1]:24001"));
auto tree1 (peers_node.get_child ("[::1]:24001"));
ASSERT_EQ (std::to_string (nano::protocol_version), tree1.get<std::string> ("protocol_version"));
ASSERT_EQ (system.nodes[1]->node_id.pub.to_account (), tree1.get<std::string> ("node_id"));
std::stringstream endpoint_text;
endpoint_text << endpoint;
auto tree2 (peers_node.get_child ("UDP: " + endpoint_text.str ()));
auto tree2 (peers_node.get_child (endpoint_text.str ()));
ASSERT_EQ (std::to_string (nano::protocol_version), tree2.get<std::string> ("protocol_version"));
ASSERT_EQ ("", tree2.get<std::string> ("node_id"));
}

View file

@ -50,7 +50,7 @@ TEST (system, receive_while_synchronizing)
nano::node_init init1;
auto node1 (std::make_shared<nano::node> (init1, system.io_ctx, 24001, nano::unique_path (), system.alarm, system.logging, system.work));
ASSERT_FALSE (init1.error ());
nano::transport::channel_udp channel (node1->network.udp_channels, system.nodes[0]->network.endpoint ());
auto channel (std::make_shared<nano::transport::channel_udp> (node1->network.udp_channels, system.nodes[0]->network.endpoint ()));
node1->network.send_keepalive (channel);
auto wallet (node1->wallets.create (1));
ASSERT_EQ (key.pub, wallet->insert_adhoc (key.prv));