From 40f53a94e6dc02daa681486561d05705f84c1594 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sun, 17 Nov 2024 16:39:29 +0100 Subject: [PATCH] Node traffic prioritization --- nano/core_test/active_elections.cpp | 4 +- nano/core_test/network.cpp | 56 +++++----- nano/core_test/node.cpp | 4 +- nano/core_test/peer_container.cpp | 2 +- nano/core_test/rep_crawler.cpp | 2 +- nano/core_test/system.cpp | 2 +- nano/core_test/tcp_listener.cpp | 2 +- nano/core_test/telemetry.cpp | 8 +- nano/lib/stats_enums.hpp | 18 +++- nano/node/bandwidth_limiter.cpp | 9 +- nano/node/bootstrap/bootstrap_server.cpp | 17 ++- nano/node/bootstrap/bootstrap_server.hpp | 2 +- nano/node/bootstrap/bootstrap_service.cpp | 22 ++-- nano/node/bootstrap/peer_scoring.cpp | 2 +- nano/node/bootstrap/peer_scoring.hpp | 4 + nano/node/confirmation_solicitor.cpp | 13 +-- nano/node/election.cpp | 2 +- nano/node/json_handler.cpp | 4 +- nano/node/network.cpp | 124 +++++++++++----------- nano/node/network.hpp | 44 +++++--- nano/node/repcrawler.cpp | 16 +-- nano/node/request_aggregator.cpp | 15 +-- nano/node/telemetry.cpp | 4 +- nano/node/transport/channel.cpp | 4 +- nano/node/transport/channel.hpp | 13 +-- nano/node/transport/fake.cpp | 11 +- nano/node/transport/fake.hpp | 6 +- nano/node/transport/inproc.cpp | 8 +- nano/node/transport/inproc.hpp | 2 +- nano/node/transport/tcp_channel.cpp | 60 ++--------- nano/node/transport/tcp_channel.hpp | 6 +- nano/node/transport/tcp_channels.cpp | 21 ++-- nano/node/transport/tcp_channels.hpp | 10 +- nano/node/transport/tcp_socket.cpp | 16 ++- nano/node/transport/tcp_socket.hpp | 7 +- nano/node/transport/traffic_type.hpp | 17 ++- nano/node/vote_generator.cpp | 18 ++-- nano/node/vote_generator.hpp | 4 - nano/node/wallet.cpp | 2 +- nano/qt/qt.cpp | 2 +- nano/slow_test/node.cpp | 2 +- 41 files changed, 269 insertions(+), 316 deletions(-) diff --git a/nano/core_test/active_elections.cpp b/nano/core_test/active_elections.cpp index 7634b4655..57bdc2c05 100644 --- a/nano/core_test/active_elections.cpp +++ b/nano/core_test/active_elections.cpp @@ -970,7 +970,7 @@ TEST (active_elections, fork_replacement_tally) node_config.peering_port = system.get_available_port (); auto & node2 (*system.add_node (node_config)); node1.network.filter.clear (); - node2.network.flood_block (send_last); + node2.network.flood_block (send_last, nano::transport::traffic_type::test); ASSERT_TIMELY (3s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 0); // Correct block without votes is ignored @@ -984,7 +984,7 @@ TEST (active_elections, fork_replacement_tally) // ensure vote arrives before the block ASSERT_TIMELY_EQ (5s, 1, node1.vote_cache.find (send_last->hash ()).size ()); node1.network.filter.clear (); - node2.network.flood_block (send_last); + node2.network.flood_block (send_last, nano::transport::traffic_type::test); ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 1); // the send_last block should replace one of the existing block of the election because it has higher vote weight diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index a35e08bb3..53d175cfe 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -195,7 +195,7 @@ TEST (network, send_discarded_publish) .build (); { auto transaction = node1.ledger.tx_begin_read (); - node1.network.flood_block (block); + node1.network.flood_block (block, nano::transport::traffic_type::test); ASSERT_EQ (nano::dev::genesis->hash (), node1.ledger.any.account_head (transaction, nano::dev::genesis_key.pub)); ASSERT_EQ (nano::dev::genesis->hash (), node2.latest (nano::dev::genesis_key.pub)); } @@ -221,7 +221,7 @@ TEST (network, send_invalid_publish) .build (); { auto transaction = node1.ledger.tx_begin_read (); - node1.network.flood_block (block); + node1.network.flood_block (block, nano::transport::traffic_type::test); ASSERT_EQ (nano::dev::genesis->hash (), node1.ledger.any.account_head (transaction, nano::dev::genesis_key.pub)); ASSERT_EQ (nano::dev::genesis->hash (), node2.latest (nano::dev::genesis_key.pub)); } @@ -306,7 +306,7 @@ TEST (network, send_insufficient_work) nano::publish publish1{ nano::dev::network_params.network, block1 }; auto tcp_channel (node1.network.tcp_channels.find_node_id (node2.get_node_id ())); ASSERT_NE (nullptr, tcp_channel); - tcp_channel->send (publish1, [] (boost::system::error_code const & ec, size_t size) {}); + tcp_channel->send (publish1, nano::transport::traffic_type::test); ASSERT_EQ (0, node1.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work)); ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work) != 0); ASSERT_EQ (1, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work)); @@ -320,7 +320,7 @@ TEST (network, send_insufficient_work) .work (system.work_generate_limited (block1->hash (), node1.network_params.work.epoch_2_receive, node1.network_params.work.epoch_1 - 1)) .build (); nano::publish publish2{ nano::dev::network_params.network, block2 }; - tcp_channel->send (publish2, [] (boost::system::error_code const & ec, size_t size) {}); + tcp_channel->send (publish2, nano::transport::traffic_type::test); ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work) != 1); ASSERT_EQ (2, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work)); // Legacy block work epoch_1 @@ -333,7 +333,7 @@ TEST (network, send_insufficient_work) .work (*system.work.generate (block2->hash (), node1.network_params.work.epoch_2)) .build (); nano::publish publish3{ nano::dev::network_params.network, block3 }; - tcp_channel->send (publish3, [] (boost::system::error_code const & ec, size_t size) {}); + tcp_channel->send (publish3, nano::transport::traffic_type::test); ASSERT_EQ (0, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in)); ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) != 0); ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in)); @@ -349,7 +349,7 @@ TEST (network, send_insufficient_work) .work (system.work_generate_limited (block1->hash (), node1.network_params.work.epoch_2_receive, node1.network_params.work.epoch_1 - 1)) .build (); nano::publish publish4{ nano::dev::network_params.network, block4 }; - tcp_channel->send (publish4, [] (boost::system::error_code const & ec, size_t size) {}); + tcp_channel->send (publish4, nano::transport::traffic_type::test); ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) != 0); ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in)); ASSERT_EQ (2, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work)); @@ -632,9 +632,9 @@ TEST (network, duplicate_detection) ASSERT_NE (nullptr, tcp_channel); ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message)); - tcp_channel->send (publish); + tcp_channel->send (publish, nano::transport::traffic_type::test); ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message), 0); - tcp_channel->send (publish); + tcp_channel->send (publish, nano::transport::traffic_type::test); ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message), 1); } @@ -681,9 +681,9 @@ TEST (network, duplicate_vote_detection) ASSERT_NE (nullptr, tcp_channel); ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message)); - tcp_channel->send (message); + tcp_channel->send (message, nano::transport::traffic_type::test); ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0); - tcp_channel->send (message); + tcp_channel->send (message, nano::transport::traffic_type::test); ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 1); } @@ -711,12 +711,12 @@ TEST (network, duplicate_revert_vote) ASSERT_NE (nullptr, tcp_channel); // First vote should be processed - tcp_channel->send (message1); + tcp_channel->send (message1, nano::transport::traffic_type::test); ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0); ASSERT_TIMELY (5s, node1.network.filter.check (bytes1.data (), bytes1.size ())); // Second vote should get dropped from processor queue - tcp_channel->send (message2); + tcp_channel->send (message2, nano::transport::traffic_type::test); ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0); // And the filter should not have it WAIT (500ms); // Give the node time to process the vote @@ -741,9 +741,9 @@ TEST (network, expire_duplicate_filter) // Send a vote ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message)); - tcp_channel->send (message); + tcp_channel->send (message, nano::transport::traffic_type::test); ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0); - tcp_channel->send (message); + tcp_channel->send (message, nano::transport::traffic_type::test); ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 1); // The filter should expire the vote after some time @@ -767,18 +767,18 @@ TEST (network, DISABLED_bandwidth_limiter_4_messages) // Send droppable messages for (auto i = 0; i < message_limit; i += 2) // number of channels { - channel1.send (message); - channel2.send (message); + channel1.send (message, nano::transport::traffic_type::test); + channel2.send (message, nano::transport::traffic_type::test); } // Only sent messages below limit, so we don't expect any drops ASSERT_TIMELY_EQ (1s, 0, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out)); // Send droppable message; drop stats should increase by one now - channel1.send (message); + channel1.send (message, nano::transport::traffic_type::test); ASSERT_TIMELY_EQ (1s, 1, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out)); // Send non-droppable message, i.e. drop stats should not increase - channel2.send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop); + channel2.send (message, nano::transport::traffic_type::test); ASSERT_TIMELY_EQ (1s, 1, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out)); } @@ -795,10 +795,10 @@ TEST (network, DISABLED_bandwidth_limiter_2_messages) nano::transport::inproc::channel channel1{ node, node }; nano::transport::inproc::channel channel2{ node, node }; // change the bandwidth settings, 2 packets will be dropped - channel1.send (message); - channel2.send (message); - channel1.send (message); - channel2.send (message); + channel1.send (message, nano::transport::traffic_type::test); + channel2.send (message, nano::transport::traffic_type::test); + channel1.send (message, nano::transport::traffic_type::test); + channel2.send (message, nano::transport::traffic_type::test); ASSERT_TIMELY_EQ (1s, 2, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out)); } @@ -815,10 +815,10 @@ TEST (network, bandwidth_limiter_with_burst) nano::transport::inproc::channel channel1{ node, node }; nano::transport::inproc::channel channel2{ node, node }; // change the bandwidth settings, no packet will be dropped - channel1.send (message); - channel2.send (message); - channel1.send (message); - channel2.send (message); + channel1.send (message, nano::transport::traffic_type::test); + channel2.send (message, nano::transport::traffic_type::test); + channel1.send (message, nano::transport::traffic_type::test); + channel2.send (message, nano::transport::traffic_type::test); ASSERT_TIMELY_EQ (1s, 0, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out)); } @@ -962,7 +962,7 @@ TEST (network, filter_invalid_network_bytes) // send a keepalive, from node2 to node1, with the wrong network bytes nano::keepalive keepalive{ nano::dev::network_params.network }; const_cast (keepalive.header.network) = nano::networks::invalid; - channel->send (keepalive); + channel->send (keepalive, nano::transport::traffic_type::test); ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::error, nano::stat::detail::invalid_network)); } @@ -981,7 +981,7 @@ TEST (network, filter_invalid_version_using) // send a keepalive, from node2 to node1, with the wrong version_using nano::keepalive keepalive{ nano::dev::network_params.network }; const_cast (keepalive.header.version_using) = nano::dev::network_params.network.protocol_version_min - 1; - channel->send (keepalive); + channel->send (keepalive, nano::transport::traffic_type::test); ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::error, nano::stat::detail::outdated_version)); } diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index e754474f4..21801313c 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -473,7 +473,7 @@ TEST (node, confirm_locked) .sign (nano::keypair ().prv, 0) .work (0) .build (); - system.nodes[0]->network.flood_block (block); + system.nodes[0]->network.flood_block (block, nano::transport::traffic_type::test); } TEST (node_config, random_rep) @@ -1007,7 +1007,7 @@ TEST (node, fork_no_vote_quorum) nano::confirm_ack confirm{ nano::dev::network_params.network, vote }; auto channel = node2.network.find_node_id (node3.node_id.pub); ASSERT_NE (nullptr, channel); - channel->send (confirm); + channel->send (confirm, nano::transport::traffic_type::test); ASSERT_TIMELY (10s, node3.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::in) >= 3); ASSERT_EQ (node1.latest (nano::dev::genesis_key.pub), send1->hash ()); ASSERT_EQ (node2.latest (nano::dev::genesis_key.pub), send1->hash ()); diff --git a/nano/core_test/peer_container.cpp b/nano/core_test/peer_container.cpp index 045943677..7857f43fe 100644 --- a/nano/core_test/peer_container.cpp +++ b/nano/core_test/peer_container.cpp @@ -255,7 +255,7 @@ TEST (peer_container, depeer_on_outdated_version) nano::keepalive keepalive{ nano::dev::network_params.network }; const_cast (keepalive.header.version_using) = nano::dev::network_params.network.protocol_version_min - 1; ASSERT_TIMELY (5s, channel->alive ()); - channel->send (keepalive); + channel->send (keepalive, nano::transport::traffic_type::test); ASSERT_TIMELY (5s, !channel->alive ()); } diff --git a/nano/core_test/rep_crawler.cpp b/nano/core_test/rep_crawler.cpp index e9d0ba8d4..eae30fe8b 100644 --- a/nano/core_test/rep_crawler.cpp +++ b/nano/core_test/rep_crawler.cpp @@ -323,7 +323,7 @@ TEST (rep_crawler, ignore_rebroadcasted) auto tick = [&] () { nano::confirm_ack msg{ nano::dev::network_params.network, vote, /* rebroadcasted */ true }; - channel2to1->send (msg, nullptr, nano::transport::buffer_drop_policy::no_socket_drop); + channel2to1->send (msg, nano::transport::traffic_type::test); return false; }; diff --git a/nano/core_test/system.cpp b/nano/core_test/system.cpp index 0ac3c28a3..4c30b7b47 100644 --- a/nano/core_test/system.cpp +++ b/nano/core_test/system.cpp @@ -211,7 +211,7 @@ TEST (system, transport_basic) nano::transport::inproc::channel channel{ node0, node1 }; // Send a keepalive message since they are easy to construct nano::keepalive junk{ nano::dev::network_params.network }; - channel.send (junk); + channel.send (junk, nano::transport::traffic_type::test); // Ensure the keepalive has been reecived on the target. ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in) > 0); } diff --git a/nano/core_test/tcp_listener.cpp b/nano/core_test/tcp_listener.cpp index 9173806f4..787066f60 100644 --- a/nano/core_test/tcp_listener.cpp +++ b/nano/core_test/tcp_listener.cpp @@ -275,7 +275,7 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake) auto channel = std::make_shared (*node0, socket); socket->async_connect (node0->tcp_listener.endpoint (), [&node_id_handshake, channel] (boost::system::error_code const & ec) { ASSERT_FALSE (ec); - channel->send (node_id_handshake, [] (boost::system::error_code const & ec, size_t size_a) { + channel->send (node_id_handshake, nano::transport::traffic_type::test, [] (boost::system::error_code const & ec, size_t size_a) { ASSERT_FALSE (ec); }); }); diff --git a/nano/core_test/telemetry.cpp b/nano/core_test/telemetry.cpp index 859d3ffb3..aa213e776 100644 --- a/nano/core_test/telemetry.cpp +++ b/nano/core_test/telemetry.cpp @@ -143,7 +143,7 @@ TEST (telemetry, dos_tcp) nano::telemetry_req message{ nano::dev::network_params.network }; auto channel = node_client->network.tcp_channels.find_node_id (node_server->get_node_id ()); ASSERT_NE (nullptr, channel); - channel->send (message, [] (boost::system::error_code const & ec, size_t size_a) { + channel->send (message, nano::transport::traffic_type::test, [] (boost::system::error_code const & ec, size_t size_a) { ASSERT_FALSE (ec); }); @@ -152,7 +152,7 @@ TEST (telemetry, dos_tcp) auto orig = std::chrono::steady_clock::now (); for (int i = 0; i < 10; ++i) { - channel->send (message, [] (boost::system::error_code const & ec, size_t size_a) { + channel->send (message, nano::transport::traffic_type::test, [] (boost::system::error_code const & ec, size_t size_a) { ASSERT_FALSE (ec); }); } @@ -165,7 +165,7 @@ TEST (telemetry, dos_tcp) // Now spam messages waiting for it to be processed while (node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in) == 1) { - channel->send (message); + channel->send (message, nano::transport::traffic_type::test); ASSERT_NO_ERROR (system.poll ()); } } @@ -214,7 +214,7 @@ TEST (telemetry, max_possible_size) auto channel = node_client->network.tcp_channels.find_node_id (node_server->get_node_id ()); ASSERT_NE (nullptr, channel); - channel->send (message, [] (boost::system::error_code const & ec, size_t size_a) { + channel->send (message, nano::transport::traffic_type::test, [] (boost::system::error_code const & ec, size_t size_a) { ASSERT_FALSE (ec); }); diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index cf10c4494..3471dc0b2 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -47,7 +47,7 @@ enum class type confirmation_height, confirmation_observer, confirming_set, - drop, + drop, // TODO: Rename to message_drop aggregator, requests, request_aggregator, @@ -69,6 +69,7 @@ enum class type bootstrap_verify_frontiers, bootstrap_process, bootstrap_request, + bootstrap_request_ec, bootstrap_request_blocks, bootstrap_reply, bootstrap_next, @@ -80,6 +81,8 @@ enum class type bootstrap_server_request, bootstrap_server_overfill, bootstrap_server_response, + bootstrap_server_send, + bootstrap_server_ec, active, active_elections, active_elections_started, @@ -98,6 +101,7 @@ enum class type optimistic_scheduler, handshake, rep_crawler, + rep_crawler_ec, local_block_broadcaster, rep_tiers, syn_cookies, @@ -313,8 +317,18 @@ enum class detail reachout_cached, connected, - // traffic + // traffic type generic, + bootstrap_server, + bootstrap_requests, + block_broadcast, + block_broadcast_initial, + block_broadcast_rpc, + confirmation_requests, + vote_rebroadcast, + vote_reply, + rep_crawler, + telemetry, // tcp tcp_silent_connection_drop, diff --git a/nano/node/bandwidth_limiter.cpp b/nano/node/bandwidth_limiter.cpp index 87e804ddc..0dca8f7d8 100644 --- a/nano/node/bandwidth_limiter.cpp +++ b/nano/node/bandwidth_limiter.cpp @@ -17,16 +17,11 @@ nano::rate_limiter & nano::bandwidth_limiter::select_limiter (nano::transport::t { switch (type) { - case nano::transport::traffic_type::bootstrap: + case nano::transport::traffic_type::bootstrap_server: return limiter_bootstrap; - case nano::transport::traffic_type::generic: - return limiter_generic; - break; default: - debug_assert (false, "missing traffic type"); - break; + return limiter_generic; } - return limiter_generic; } bool nano::bandwidth_limiter::should_pass (std::size_t buffer_size, nano::transport::traffic_type type) diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index d8320164d..904f1916c 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -104,7 +104,7 @@ bool nano::bootstrap_server::verify (const nano::asc_pull_req & message) const return std::visit (verify_visitor{}, message.payload); } -bool nano::bootstrap_server::request (nano::asc_pull_req const & message, std::shared_ptr channel) +bool nano::bootstrap_server::request (nano::asc_pull_req const & message, std::shared_ptr const & channel) { if (!verify (message)) { @@ -113,8 +113,7 @@ bool nano::bootstrap_server::request (nano::asc_pull_req const & message, std::s } // If channel is full our response will be dropped anyway, so filter that early - // TODO: Add per channel limits (this ideally should be done on the channel message processing side) - if (channel->max (nano::transport::traffic_type::bootstrap)) + if (channel->max (nano::transport::traffic_type::bootstrap_server)) { stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::channel_full, nano::stat::dir::in); return false; @@ -171,13 +170,9 @@ void nano::bootstrap_server::respond (nano::asc_pull_ack & response, std::shared on_response.notify (response, channel); channel->send ( - response, [this] (auto & ec, auto size) { - if (ec) - { - stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::write_error, nano::stat::dir::out); - } - }, - nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap); + response, nano::transport::traffic_type::bootstrap_server, [this] (auto & ec, auto size) { + stats.inc (nano::stat::type::bootstrap_server_ec, to_stat_detail (ec), nano::stat::dir::out); + }); } void nano::bootstrap_server::run () @@ -220,7 +215,7 @@ void nano::bootstrap_server::run_batch (nano::unique_lock & lock) transaction.refresh_if_needed (); - if (!channel->max (nano::transport::traffic_type::bootstrap)) + if (!channel->max (nano::transport::traffic_type::bootstrap_server)) { auto response = process (transaction, request); respond (response, channel); diff --git a/nano/node/bootstrap/bootstrap_server.hpp b/nano/node/bootstrap/bootstrap_server.hpp index 41c33404d..4a211cafb 100644 --- a/nano/node/bootstrap/bootstrap_server.hpp +++ b/nano/node/bootstrap/bootstrap_server.hpp @@ -42,7 +42,7 @@ public: * Process `asc_pull_req` message coming from network. * Reply will be sent back over passed in `channel` */ - bool request (nano::asc_pull_req const & message, std::shared_ptr channel); + bool request (nano::asc_pull_req const & message, std::shared_ptr const & channel); public: // Events nano::observer_set const &> on_response; diff --git a/nano/node/bootstrap/bootstrap_service.cpp b/nano/node/bootstrap/bootstrap_service.cpp index c2d818824..e0052f2f7 100644 --- a/nano/node/bootstrap/bootstrap_service.cpp +++ b/nano/node/bootstrap/bootstrap_service.cpp @@ -201,14 +201,12 @@ bool nano::bootstrap_service::send (std::shared_ptr co request.update_header (); - stats.inc (nano::stat::type::bootstrap, nano::stat::detail::request, nano::stat::dir::out); - stats.inc (nano::stat::type::bootstrap_request, to_stat_detail (tag.type)); - - channel->send ( - request, [this, id = tag.id] (auto const & ec, auto size) { + bool sent = channel->send ( + request, nano::transport::traffic_type::bootstrap_requests, [this, id = tag.id] (auto const & ec, auto size) { nano::lock_guard lock{ mutex }; if (auto it = tags.get ().find (id); it != tags.get ().end ()) { + stats.inc (nano::stat::type::bootstrap_request_ec, to_stat_detail (ec), nano::stat::dir::out); if (!ec) { stats.inc (nano::stat::type::bootstrap, nano::stat::detail::request_success, nano::stat::dir::out); @@ -222,9 +220,19 @@ bool nano::bootstrap_service::send (std::shared_ptr co stats.inc (nano::stat::type::bootstrap, nano::stat::detail::request_failed, nano::stat::dir::out); tags.get ().erase (it); } - } }, nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap); + } }); - return true; // TODO: Return channel send result + if (sent) + { + stats.inc (nano::stat::type::bootstrap, nano::stat::detail::request); + stats.inc (nano::stat::type::bootstrap_request, to_stat_detail (tag.type)); + } + else + { + stats.inc (nano::stat::type::bootstrap, nano::stat::detail::request_failed); + } + + return sent; } std::size_t nano::bootstrap_service::priority_size () const diff --git a/nano/node/bootstrap/peer_scoring.cpp b/nano/node/bootstrap/peer_scoring.cpp index a86bdc574..1e406ab96 100644 --- a/nano/node/bootstrap/peer_scoring.cpp +++ b/nano/node/bootstrap/peer_scoring.cpp @@ -68,7 +68,7 @@ std::shared_ptr nano::bootstrap::peer_scoring::channel { for (auto const & channel : channels) { - if (!channel->max (nano::transport::traffic_type::bootstrap)) + if (!channel->max (traffic_type)) { if (!try_send_message (channel)) { diff --git a/nano/node/bootstrap/peer_scoring.hpp b/nano/node/bootstrap/peer_scoring.hpp index 9b4186539..0bae9255c 100644 --- a/nano/node/bootstrap/peer_scoring.hpp +++ b/nano/node/bootstrap/peer_scoring.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -19,6 +20,9 @@ namespace bootstrap // Container for tracking and scoring peers with respect to bootstrapping class peer_scoring { + public: + static nano::transport::traffic_type constexpr traffic_type = nano::transport::traffic_type::bootstrap_requests; + public: peer_scoring (bootstrap_config const &, nano::network_constants const &); diff --git a/nano/node/confirmation_solicitor.cpp b/nano/node/confirmation_solicitor.cpp index 5695f5f8b..a0a71a032 100644 --- a/nano/node/confirmation_solicitor.cpp +++ b/nano/node/confirmation_solicitor.cpp @@ -44,12 +44,13 @@ bool nano::confirmation_solicitor::broadcast (nano::election const & election_a) bool const different (exists && existing->second.hash != hash); if (!exists || different) { - i->channel->send (winner); + i->channel->send (winner, nano::transport::traffic_type::block_broadcast); count += different ? 0 : 1; } } // Random flood for block propagation - network.flood_message (winner, nano::transport::buffer_drop_policy::limiter, 0.5f); + // TODO: Avoid broadcasting to the same peers that were already broadcasted to + network.flood_message (winner, nano::transport::traffic_type::block_broadcast, 0.5f); error = false; } return error; @@ -71,9 +72,9 @@ bool nano::confirmation_solicitor::add (nano::election const & election_a) bool const different (exists && existing->second.hash != hash); if (!exists || !is_final || different) { - auto & request_queue (requests[rep.channel]); - if (!rep.channel->max ()) + if (!rep.channel->max (nano::transport::traffic_type::confirmation_requests)) { + auto & request_queue (requests[rep.channel]); request_queue.emplace_back (election_a.status.winner->hash (), election_a.status.winner->root ()); count += different ? 0 : 1; error = false; @@ -101,14 +102,14 @@ void nano::confirmation_solicitor::flush () if (roots_hashes_l.size () == nano::network::confirm_req_hashes_max) { nano::confirm_req req{ config.network_params.network, roots_hashes_l }; - channel->send (req); + channel->send (req, nano::transport::traffic_type::confirmation_requests); roots_hashes_l.clear (); } } if (!roots_hashes_l.empty ()) { nano::confirm_req req{ config.network_params.network, roots_hashes_l }; - channel->send (req); + channel->send (req, nano::transport::traffic_type::confirmation_requests); } } prepared = false; diff --git a/nano/node/election.cpp b/nano/node/election.cpp index 125c4675f..d9b329f39 100644 --- a/nano/node/election.cpp +++ b/nano/node/election.cpp @@ -572,7 +572,7 @@ bool nano::election::publish (std::shared_ptr const & block_a) if (status.winner->hash () == block_a->hash ()) { status.winner = block_a; - node.network.flood_block (block_a, nano::transport::buffer_drop_policy::no_limiter_drop); + node.network.flood_block (block_a, nano::transport::traffic_type::block_broadcast); } } } diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 708e0e3f9..6cff65f72 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -3632,7 +3632,7 @@ void nano::json_handler::republish () } hash = node.ledger.any.block_successor (transaction, hash).value_or (0); } - node.network.flood_block_many (std::move (republish_bundle), nullptr, 25); + node.network.flood_block_many (std::move (republish_bundle), nano::transport::traffic_type::block_broadcast_rpc, 25ms); response_l.put ("success", ""); // obsolete response_l.add_child ("blocks", blocks); } @@ -4867,7 +4867,7 @@ void nano::json_handler::wallet_republish () blocks.push_back (std::make_pair ("", entry)); } } - node.network.flood_block_many (std::move (republish_bundle), nullptr, 25); + node.network.flood_block_many (std::move (republish_bundle), nano::transport::traffic_type::keepalive, 25ms); response_l.add_child ("blocks", blocks); } response_errors (); diff --git a/nano/node/network.cpp b/nano/node/network.cpp index ba9fae79f..12ee9718d 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -233,109 +233,113 @@ void nano::network::run_reachout_cached () } } -void nano::network::send_keepalive (std::shared_ptr const & channel_a) +void nano::network::send_keepalive (std::shared_ptr const & channel) const { nano::keepalive message{ node.network_params.network }; random_fill (message.peers); - channel_a->send (message); + channel->send (message, nano::transport::traffic_type::keepalive); } -void nano::network::send_keepalive_self (std::shared_ptr const & channel_a) +void nano::network::send_keepalive_self (std::shared_ptr const & channel) const { nano::keepalive message{ node.network_params.network }; fill_keepalive_self (message.peers); - channel_a->send (message); + channel->send (message, nano::transport::traffic_type::keepalive); } -void nano::network::flood_message (nano::message & message_a, nano::transport::buffer_drop_policy const drop_policy_a, float const scale_a) +void nano::network::flood_message (nano::message const & message, nano::transport::traffic_type type, float scale) const { - for (auto & i : list (fanout (scale_a))) + for (auto const & channel : list (fanout (scale))) { - i->send (message_a, nullptr, drop_policy_a); + channel->send (message, type); } } -void nano::network::flood_keepalive (float const scale_a) +void nano::network::flood_keepalive (float scale) const { nano::keepalive message{ node.network_params.network }; random_fill (message.peers); - flood_message (message, nano::transport::buffer_drop_policy::limiter, scale_a); + flood_message (message, nano::transport::traffic_type::keepalive, scale); } -void nano::network::flood_keepalive_self (float const scale_a) +void nano::network::flood_keepalive_self (float scale) const { nano::keepalive message{ node.network_params.network }; fill_keepalive_self (message.peers); - flood_message (message, nano::transport::buffer_drop_policy::limiter, scale_a); + flood_message (message, nano::transport::traffic_type::keepalive, scale); } -void nano::network::flood_block (std::shared_ptr const & block, nano::transport::buffer_drop_policy const drop_policy) +void nano::network::flood_block (std::shared_ptr const & block, nano::transport::traffic_type type) const { nano::publish message{ node.network_params.network, block }; - flood_message (message, drop_policy); + flood_message (message, type); } -void nano::network::flood_block_initial (std::shared_ptr const & block) +void nano::network::flood_block_initial (std::shared_ptr const & block) const { nano::publish message{ node.network_params.network, block, /* is_originator */ true }; for (auto const & rep : node.rep_crawler.principal_representatives ()) { - rep.channel->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop); + rep.channel->send (message, nano::transport::traffic_type::block_broadcast_initial); } for (auto & peer : list_non_pr (fanout (1.0))) { - peer->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop); + peer->send (message, nano::transport::traffic_type::block_broadcast_initial); } } -void nano::network::flood_vote (std::shared_ptr const & vote, float scale, bool rebroadcasted) +void nano::network::flood_vote (std::shared_ptr const & vote, float scale, bool rebroadcasted) const { nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted }; - for (auto & i : list (fanout (scale))) + for (auto & channel : list (fanout (scale))) { - i->send (message, nullptr); + channel->send (message, rebroadcasted ? nano::transport::traffic_type::vote_rebroadcast : nano::transport::traffic_type::vote); } } -void nano::network::flood_vote_non_pr (std::shared_ptr const & vote, float scale, bool rebroadcasted) +void nano::network::flood_vote_non_pr (std::shared_ptr const & vote, float scale, bool rebroadcasted) const { nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted }; - for (auto & i : list_non_pr (fanout (scale))) + for (auto & channel : list_non_pr (fanout (scale))) { - i->send (message, nullptr); + channel->send (message, rebroadcasted ? nano::transport::traffic_type::vote_rebroadcast : nano::transport::traffic_type::vote); } } -void nano::network::flood_vote_pr (std::shared_ptr const & vote, bool rebroadcasted) +void nano::network::flood_vote_pr (std::shared_ptr const & vote, bool rebroadcasted) const { nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted }; - for (auto const & i : node.rep_crawler.principal_representatives ()) + for (auto const & channel : node.rep_crawler.principal_representatives ()) { - i.channel->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop); + channel.channel->send (message, rebroadcasted ? nano::transport::traffic_type::vote_rebroadcast : nano::transport::traffic_type::vote); } } -void nano::network::flood_block_many (std::deque> blocks_a, std::function callback_a, unsigned delay_a) +void nano::network::flood_block_many (std::deque> blocks, nano::transport::traffic_type type, std::chrono::milliseconds delay, std::function callback) const { - if (!blocks_a.empty ()) + if (blocks.empty ()) { - auto block_l (blocks_a.front ()); - blocks_a.pop_front (); - flood_block (block_l); - if (!blocks_a.empty ()) - { - std::weak_ptr node_w (node.shared ()); - node.workers.post_delayed (std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks (std::move (blocks_a)), callback_a, delay_a] () { - if (auto node_l = node_w.lock ()) - { - node_l->network.flood_block_many (std::move (blocks), callback_a, delay_a); - } - }); - } - else if (callback_a) - { - callback_a (); - } + return; + } + + auto block = blocks.front (); + blocks.pop_front (); + + flood_block (block, type); + + if (!blocks.empty ()) + { + std::weak_ptr node_w (node.shared ()); + node.workers.post_delayed (delay, [node_w, type, blocks = std::move (blocks), delay, callback] () mutable { + if (auto node_l = node_w.lock ()) + { + node_l->network.flood_block_many (std::move (blocks), type, delay, callback); + } + }); + } + else if (callback) + { + callback (); } } @@ -365,7 +369,7 @@ bool nano::network::merge_peer (nano::endpoint const & peer) return false; // Not initiated } -bool nano::network::not_a_peer (nano::endpoint const & endpoint_a, bool allow_local_peers) +bool nano::network::not_a_peer (nano::endpoint const & endpoint_a, bool allow_local_peers) const { bool result (false); if (endpoint_a.address ().to_v6 ().is_unspecified ()) @@ -393,32 +397,32 @@ bool nano::network::track_reachout (nano::endpoint const & endpoint_a) return tcp_channels.track_reachout (endpoint_a); } -std::deque> nano::network::list (std::size_t count_a, uint8_t minimum_version_a, bool include_tcp_temporary_channels_a) +std::deque> nano::network::list (std::size_t max_count, uint8_t minimum_version) const { - std::deque> result; - tcp_channels.list (result, minimum_version_a, include_tcp_temporary_channels_a); - nano::random_pool_shuffle (result.begin (), result.end ()); - if (count_a > 0 && result.size () > count_a) + auto result = tcp_channels.list (minimum_version); + nano::random_pool_shuffle (result.begin (), result.end ()); // Randomize returned peer order + if (max_count > 0 && result.size () > max_count) { - result.resize (count_a, nullptr); + result.resize (max_count, nullptr); } return result; } -std::deque> nano::network::list_non_pr (std::size_t count_a) +std::deque> nano::network::list_non_pr (std::size_t max_count, uint8_t minimum_version) const { - std::deque> result; - tcp_channels.list (result); + auto result = tcp_channels.list (minimum_version); auto partition_point = std::partition (result.begin (), result.end (), [this] (std::shared_ptr const & channel) { return !node.rep_crawler.is_pr (channel); }); result.resize (std::distance (result.begin (), partition_point)); - nano::random_pool_shuffle (result.begin (), result.end ()); - if (result.size () > count_a) + + nano::random_pool_shuffle (result.begin (), result.end ()); // Randomize returned peer order + + if (result.size () > max_count) { - result.resize (count_a, nullptr); + result.resize (max_count, nullptr); } return result; } @@ -429,14 +433,14 @@ std::size_t nano::network::fanout (float scale) const return static_cast (std::ceil (scale * size_sqrt ())); } -std::unordered_set> nano::network::random_set (std::size_t count_a, uint8_t min_version_a, bool include_temporary_channels_a) const +std::unordered_set> nano::network::random_set (std::size_t max_count, uint8_t minimum_version) const { - return tcp_channels.random_set (count_a, min_version_a, include_temporary_channels_a); + return tcp_channels.random_set (max_count, minimum_version); } void nano::network::random_fill (std::array & target_a) const { - auto peers (random_set (target_a.size (), 0, false)); // Don't include channels with ephemeral remote ports + auto peers (random_set (target_a.size (), 0)); debug_assert (peers.size () <= target_a.size ()); auto endpoint (nano::endpoint (boost::asio::ip::address_v6{}, 0)); debug_assert (endpoint.address ().is_v6 ()); diff --git a/nano/node/network.hpp b/nano/node/network.hpp index f9c7bbee1..621e7fdd9 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -90,37 +90,48 @@ public: void start (); void stop (); - void flood_message (nano::message &, nano::transport::buffer_drop_policy const = nano::transport::buffer_drop_policy::limiter, float const = 1.0f); - void flood_keepalive (float const scale_a = 1.0f); - void flood_keepalive_self (float const scale_a = 0.5f); - void flood_vote (std::shared_ptr const &, float scale, bool rebroadcasted = false); - void flood_vote_pr (std::shared_ptr const &, bool rebroadcasted = false); - void flood_vote_non_pr (std::shared_ptr const &, float scale, bool rebroadcasted = false); + nano::endpoint endpoint () const; + + void flood_message (nano::message const &, nano::transport::traffic_type, float scale = 1.0f) const; + void flood_keepalive (float scale = 1.0f) const; + void flood_keepalive_self (float scale = 0.5f) const; + void flood_vote (std::shared_ptr const &, float scale, bool rebroadcasted = false) const; + void flood_vote_pr (std::shared_ptr const &, bool rebroadcasted = false) const; + void flood_vote_non_pr (std::shared_ptr const &, float scale, bool rebroadcasted = false) const; // Flood block to all PRs and a random selection of non-PRs - void flood_block_initial (std::shared_ptr const &); + void flood_block_initial (std::shared_ptr const &) const; // Flood block to a random selection of peers - void flood_block (std::shared_ptr const &, nano::transport::buffer_drop_policy const = nano::transport::buffer_drop_policy::limiter); - void flood_block_many (std::deque>, std::function = nullptr, unsigned = broadcast_interval_ms); + void flood_block (std::shared_ptr const &, nano::transport::traffic_type) const; + void flood_block_many (std::deque>, nano::transport::traffic_type, std::chrono::milliseconds delay = 10ms, std::function callback = nullptr) const; + + void send_keepalive (std::shared_ptr const &) const; + void send_keepalive_self (std::shared_ptr const &) const; + void merge_peers (std::array const & ips); bool merge_peer (nano::endpoint const & ip); - void send_keepalive (std::shared_ptr const &); - void send_keepalive_self (std::shared_ptr const &); + std::shared_ptr find_node_id (nano::account const &); std::shared_ptr find_channel (nano::endpoint const &); - bool not_a_peer (nano::endpoint const &, bool allow_local_peers); + + // Check if the endpoint address looks OK + bool not_a_peer (nano::endpoint const &, bool allow_local_peers) const; // Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt bool track_reachout (nano::endpoint const &); - std::deque> list (std::size_t max_count = 0, uint8_t = 0, bool = true); - std::deque> list_non_pr (std::size_t); + + std::deque> list (std::size_t max_count = 0, uint8_t minimum_version = 0) const; + std::deque> list_non_pr (std::size_t max_count, uint8_t minimum_version = 0) const; + // Desired fanout for a given scale std::size_t fanout (float scale = 1.0f) const; + void random_fill (std::array &) const; void fill_keepalive_self (std::array &) const; + // Note: The minimum protocol version is used after the random selection, so number of peers can be less than expected. - std::unordered_set> random_set (std::size_t count, uint8_t min_version = 0, bool include_temporary_channels = false) const; + std::unordered_set> random_set (std::size_t max_count, uint8_t minimum_version = 0) const; + // Get the next peer for attempting a tcp bootstrap connection nano::tcp_endpoint bootstrap_peer (); - nano::endpoint endpoint () const; void cleanup (std::chrono::steady_clock::time_point const & cutoff); std::size_t size () const; float size_sqrt () const; @@ -169,7 +180,6 @@ private: std::thread reachout_cached_thread; public: - static unsigned const broadcast_interval_ms = 10; static std::size_t const buffer_size = 512; static std::size_t confirm_req_hashes_max; diff --git a/nano/node/repcrawler.cpp b/nano/node/repcrawler.cpp index b2b98f341..ad4ad2d20 100644 --- a/nano/node/repcrawler.cpp +++ b/nano/node/repcrawler.cpp @@ -260,7 +260,7 @@ std::deque> nano::rep_crawler::prepare // Crawl more aggressively if we lack sufficient total peer weight. auto const required_peer_count = sufficient_weight ? conservative_count : aggressive_count; - auto random_peers = node.network.random_set (required_peer_count, 0, /* include channels with ephemeral remote ports */ true); + auto random_peers = node.network.random_set (required_peer_count); auto should_query = [&, this] (std::shared_ptr const & channel) { if (auto rep = reps.get ().find (channel); rep != reps.get ().end ()) @@ -339,8 +339,6 @@ void nano::rep_crawler::query (std::dequesend ( - req, - [this] (auto & ec, auto size) { - if (ec) - { - stats.inc (nano::stat::type::rep_crawler, nano::stat::detail::write_error, nano::stat::dir::out); - } - }, - nano::transport::buffer_drop_policy::no_socket_drop); + channel->send (req, nano::transport::traffic_type::rep_crawler, [this] (auto & ec, auto size) { + stats.inc (nano::stat::type::rep_crawler_ec, to_stat_detail (ec), nano::stat::dir::out); + }); } else { diff --git a/nano/node/request_aggregator.cpp b/nano/node/request_aggregator.cpp index 3311978d2..8a0c9224f 100644 --- a/nano/node/request_aggregator.cpp +++ b/nano/node/request_aggregator.cpp @@ -26,13 +26,6 @@ nano::request_aggregator::request_aggregator (request_aggregator_config const & generator (generator_a), final_generator (final_generator_a) { - generator.set_reply_action ([this] (std::shared_ptr const & vote_a, std::shared_ptr const & channel_a) { - this->reply_action (vote_a, channel_a); - }); - final_generator.set_reply_action ([this] (std::shared_ptr const & vote_a, std::shared_ptr const & channel_a) { - this->reply_action (vote_a, channel_a); - }); - queue.max_size_query = [this] (auto const & origin) { return config.max_queue; }; @@ -159,7 +152,7 @@ void nano::request_aggregator::run_batch (nano::unique_lock & lock) transaction.refresh_if_needed (); - if (!channel->max ()) + if (!channel->max (nano::transport::traffic_type::vote_reply)) { process (transaction, request, channel); } @@ -192,12 +185,6 @@ void nano::request_aggregator::process (nano::secure::transaction const & transa } } -void nano::request_aggregator::reply_action (std::shared_ptr const & vote_a, std::shared_ptr const & channel_a) const -{ - nano::confirm_ack confirm{ network_constants, vote_a }; - channel_a->send (confirm); -} - void nano::request_aggregator::erase_duplicates (std::vector> & requests_a) const { std::sort (requests_a.begin (), requests_a.end (), [] (auto const & pair1, auto const & pair2) { diff --git a/nano/node/telemetry.cpp b/nano/node/telemetry.cpp index 7a349ff6b..cadc3a7d8 100644 --- a/nano/node/telemetry.cpp +++ b/nano/node/telemetry.cpp @@ -214,7 +214,7 @@ void nano::telemetry::request (std::shared_ptr const & stats.inc (nano::stat::type::telemetry, nano::stat::detail::request); nano::telemetry_req message{ network_params.network }; - channel->send (message); + channel->send (message, nano::transport::traffic_type::telemetry); } void nano::telemetry::run_broadcasts () @@ -233,7 +233,7 @@ void nano::telemetry::broadcast (std::shared_ptr const stats.inc (nano::stat::type::telemetry, nano::stat::detail::broadcast); nano::telemetry_ack message{ network_params.network, telemetry }; - channel->send (message); + channel->send (message, nano::transport::traffic_type::telemetry); } void nano::telemetry::cleanup () diff --git a/nano/node/transport/channel.cpp b/nano/node/transport/channel.cpp index 12bd75387..ea470f2ec 100644 --- a/nano/node/transport/channel.cpp +++ b/nano/node/transport/channel.cpp @@ -14,10 +14,10 @@ nano::transport::channel::channel (nano::node & node_a) : set_network_version (node_a.network_params.network.protocol_version); } -bool nano::transport::channel::send (nano::message const & message, std::function const & callback, nano::transport::buffer_drop_policy drop_policy, nano::transport::traffic_type traffic_type) +bool nano::transport::channel::send (nano::message const & message, nano::transport::traffic_type traffic_type, callback_t callback) { auto buffer = message.to_shared_const_buffer (); - bool sent = send_buffer (buffer, callback, drop_policy, traffic_type); + bool sent = send_buffer (buffer, traffic_type, std::move (callback)); node.stats.inc (sent ? nano::stat::type::message : nano::stat::type::drop, to_stat_detail (message.type ()), nano::stat::dir::out, /* aggregate all */ true); return sent; } diff --git a/nano/node/transport/channel.hpp b/nano/node/transport/channel.hpp index f2d2f8706..5911f1249 100644 --- a/nano/node/transport/channel.hpp +++ b/nano/node/transport/channel.hpp @@ -30,10 +30,7 @@ public: virtual ~channel () = default; /// @returns true if the message was sent (or queued to be sent), false if it was immediately dropped - bool send (nano::message const &, - callback_t const & callback = nullptr, - nano::transport::buffer_drop_policy policy = nano::transport::buffer_drop_policy::limiter, - nano::transport::traffic_type = nano::transport::traffic_type::generic); + bool send (nano::message const &, nano::transport::traffic_type, callback_t = nullptr); virtual void close () = 0; @@ -43,7 +40,7 @@ public: virtual std::string to_string () const = 0; virtual nano::transport::transport_type get_type () const = 0; - virtual bool max (nano::transport::traffic_type = nano::transport::traffic_type::generic) + virtual bool max (nano::transport::traffic_type) { return false; } @@ -123,11 +120,7 @@ public: std::shared_ptr owner () const; protected: - virtual bool send_buffer (nano::shared_const_buffer const &, - callback_t const & callback = nullptr, - nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, - nano::transport::traffic_type = nano::transport::traffic_type::generic) - = 0; + virtual bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, callback_t) = 0; protected: nano::node & node; diff --git a/nano/node/transport/fake.cpp b/nano/node/transport/fake.cpp index 5124b7906..81d3fec7a 100644 --- a/nano/node/transport/fake.cpp +++ b/nano/node/transport/fake.cpp @@ -14,14 +14,13 @@ nano::transport::fake::channel::channel (nano::node & node) : /** * The send function behaves like a null device, it throws the data away and returns success. */ -bool nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type) +bool nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer, nano::transport::traffic_type traffic_type, nano::transport::channel::callback_t callback) { - // auto bytes = buffer_a.to_bytes (); - auto size = buffer_a.size (); - if (callback_a) + auto size = buffer.size (); + if (callback) { - node.io_ctx.post ([callback_a, size] () { - callback_a (boost::system::errc::make_error_code (boost::system::errc::success), size); + node.io_ctx.post ([callback, size] () { + callback (boost::system::errc::make_error_code (boost::system::errc::success), size); }); } return true; diff --git a/nano/node/transport/fake.hpp b/nano/node/transport/fake.hpp index 872fa0555..f503b2bd5 100644 --- a/nano/node/transport/fake.hpp +++ b/nano/node/transport/fake.hpp @@ -50,11 +50,7 @@ namespace transport } protected: - bool send_buffer ( - nano::shared_const_buffer const &, - std::function const & = nullptr, - nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, - nano::transport::traffic_type = nano::transport::traffic_type::generic) override; + bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override; private: nano::endpoint endpoint; diff --git a/nano/node/transport/inproc.cpp b/nano/node/transport/inproc.cpp index b55c957f3..78a4be990 100644 --- a/nano/node/transport/inproc.cpp +++ b/nano/node/transport/inproc.cpp @@ -18,10 +18,10 @@ nano::transport::inproc::channel::channel (nano::node & node, nano::node & desti * Send the buffer to the peer and call the callback function when done. The call never fails. * Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background. */ -bool nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type) +bool nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer, nano::transport::traffic_type traffic_type, nano::transport::channel::callback_t callback) { std::size_t offset{ 0 }; - auto const buffer_read_fn = [&offset, buffer_v = buffer_a.to_bytes ()] (std::shared_ptr> const & data_a, std::size_t size_a, std::function callback_a) { + auto const buffer_read_fn = [&offset, buffer_v = buffer.to_bytes ()] (std::shared_ptr> const & data_a, std::size_t size_a, std::function callback_a) { debug_assert (buffer_v.size () >= (offset + size_a)); data_a->resize (size_a); auto const copy_start = buffer_v.begin () + offset; @@ -48,9 +48,9 @@ bool nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer co } }); - if (callback_a) + if (callback) { - node.io_ctx.post ([callback_l = std::move (callback_a), buffer_size = buffer_a.size ()] () { + node.io_ctx.post ([callback_l = std::move (callback), buffer_size = buffer.size ()] () { callback_l (boost::system::errc::make_error_code (boost::system::errc::success), buffer_size); }); } diff --git a/nano/node/transport/inproc.hpp b/nano/node/transport/inproc.hpp index 0f280c515..dfe932e77 100644 --- a/nano/node/transport/inproc.hpp +++ b/nano/node/transport/inproc.hpp @@ -40,7 +40,7 @@ namespace transport } protected: - bool send_buffer (nano::shared_const_buffer const &, std::function const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) override; + bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override; private: nano::node & destination; diff --git a/nano/node/transport/tcp_channel.cpp b/nano/node/transport/tcp_channel.cpp index 4396dd2d5..d86542e5a 100644 --- a/nano/node/transport/tcp_channel.cpp +++ b/nano/node/transport/tcp_channel.cpp @@ -75,62 +75,24 @@ bool nano::transport::tcp_channel::max (nano::transport::traffic_type traffic_ty return queue.max (traffic_type); } -bool nano::transport::tcp_channel::send_buffer (nano::shared_const_buffer const & buffer, std::function const & callback, nano::transport::buffer_drop_policy policy, nano::transport::traffic_type traffic_type) +bool nano::transport::tcp_channel::send_buffer (nano::shared_const_buffer const & buffer, nano::transport::traffic_type type, nano::transport::channel::callback_t callback) { nano::unique_lock lock{ mutex }; - if (!queue.max (traffic_type) || (policy == buffer_drop_policy::no_socket_drop && !queue.full (traffic_type))) + if (!queue.full (type)) { - queue.push (traffic_type, { buffer, callback }); + queue.push (type, { buffer, callback }); lock.unlock (); node.stats.inc (nano::stat::type::tcp_channel, nano::stat::detail::queued, nano::stat::dir::out); - node.stats.inc (nano::stat::type::tcp_channel_queued, to_stat_detail (traffic_type), nano::stat::dir::out); + node.stats.inc (nano::stat::type::tcp_channel_queued, to_stat_detail (type), nano::stat::dir::out); sending_task.notify (); return true; } else { node.stats.inc (nano::stat::type::tcp_channel, nano::stat::detail::drop, nano::stat::dir::out); - node.stats.inc (nano::stat::type::tcp_channel_drop, to_stat_detail (traffic_type), nano::stat::dir::out); + node.stats.inc (nano::stat::type::tcp_channel_drop, to_stat_detail (type), nano::stat::dir::out); } return false; - - // if (!socket->max (traffic_type) || (policy_a == nano::transport::buffer_drop_policy::no_socket_drop && !socket->full (traffic_type))) - // { - // socket->async_write ( - // buffer_a, [this_s = shared_from_this (), endpoint_a = socket->remote_endpoint (), node = std::weak_ptr{ node.shared () }, callback_a] (boost::system::error_code const & ec, std::size_t size_a) { - // if (auto node_l = node.lock ()) - // { - // if (!ec) - // { - // this_s->set_last_packet_sent (std::chrono::steady_clock::now ()); - // } - // if (ec == boost::system::errc::host_unreachable) - // { - // node_l->stats.inc (nano::stat::type::error, nano::stat::detail::unreachable_host, nano::stat::dir::out); - // } - // if (callback_a) - // { - // callback_a (ec, size_a); - // } - // } - // }, - // traffic_type); - // } - // else - // { - // if (policy_a == nano::transport::buffer_drop_policy::no_socket_drop) - // { - // node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out); - // } - // else - // { - // node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out); - // } - // if (callback_a) - // { - // callback_a (boost::system::errc::make_error_code (boost::system::errc::no_buffer_space), 0); - // } - // } } asio::awaitable nano::transport::tcp_channel::run_sending (nano::async::condition & condition) @@ -227,10 +189,10 @@ asio::awaitable nano::transport::tcp_channel::wait_socket (nano::transport { debug_assert (strand.running_in_this_thread ()); - auto should_wait = [this, type] () { + auto should_wait = [this] () { if (auto socket_l = socket.lock ()) { - return socket_l->full (type); + return socket_l->full (); } return false; // Abort if the socket is dead }; @@ -373,14 +335,6 @@ auto nano::transport::tcp_channel_queue::next_batch (size_t max_count) -> batch_ size_t nano::transport::tcp_channel_queue::priority (traffic_type type) const { - switch (type) - { - case traffic_type::generic: - return 1; - case traffic_type::bootstrap: - return 1; - } - debug_assert (false); return 1; } diff --git a/nano/node/transport/tcp_channel.hpp b/nano/node/transport/tcp_channel.hpp index 094ea189e..99ea7fd4b 100644 --- a/nano/node/transport/tcp_channel.hpp +++ b/nano/node/transport/tcp_channel.hpp @@ -64,11 +64,7 @@ public: std::string to_string () const override; protected: - bool send_buffer (nano::shared_const_buffer const &, - nano::transport::channel::callback_t const & callback = nullptr, - nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, - nano::transport::traffic_type = nano::transport::traffic_type::generic) - override; + bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override; private: void start (); diff --git a/nano/node/transport/tcp_channels.cpp b/nano/node/transport/tcp_channels.cpp index f515d67c2..62cc25b65 100644 --- a/nano/node/transport/tcp_channels.cpp +++ b/nano/node/transport/tcp_channels.cpp @@ -151,7 +151,7 @@ std::shared_ptr nano::transport::tcp_channels::fin return result; } -std::unordered_set> nano::transport::tcp_channels::random_set (std::size_t count_a, uint8_t min_version, bool include_temporary_channels_a) const +std::unordered_set> nano::transport::tcp_channels::random_set (std::size_t count_a, uint8_t min_version) const { std::unordered_set> result; result.reserve (count_a); @@ -378,7 +378,7 @@ void nano::transport::tcp_channels::keepalive () for (auto & channel : to_wakeup) { - channel->send (message); + channel->send (message, nano::transport::traffic_type::keepalive); } } @@ -402,14 +402,19 @@ std::optional nano::transport::tcp_channels::sample_keepalive ( return std::nullopt; } -void nano::transport::tcp_channels::list (std::deque> & deque_a, uint8_t minimum_version_a, bool include_temporary_channels_a) +std::deque> nano::transport::tcp_channels::list (uint8_t minimum_version) const { nano::lock_guard lock{ mutex }; - // clang-format off - nano::transform_if (channels.get ().begin (), channels.get ().end (), std::back_inserter (deque_a), - [include_temporary_channels_a, minimum_version_a](auto & channel_a) { return channel_a.channel->get_network_version () >= minimum_version_a; }, - [](auto const & channel) { return channel.channel; }); - // clang-format on + + std::deque> result; + for (auto const & entry : channels) + { + if (entry.channel->get_network_version () >= minimum_version) + { + result.push_back (entry.channel); + } + } + return result; } bool nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint) diff --git a/nano/node/transport/tcp_channels.hpp b/nano/node/transport/tcp_channels.hpp index fef9f844a..939d2040f 100644 --- a/nano/node/transport/tcp_channels.hpp +++ b/nano/node/transport/tcp_channels.hpp @@ -41,17 +41,17 @@ public: std::size_t size () const; std::shared_ptr find_channel (nano::tcp_endpoint const &) const; void random_fill (std::array &) const; - std::unordered_set> random_set (std::size_t, uint8_t = 0, bool = false) const; std::shared_ptr find_node_id (nano::account const &); // Get the next peer for attempting a tcp connection nano::tcp_endpoint bootstrap_peer (); - bool max_ip_connections (nano::tcp_endpoint const & endpoint_a); - bool max_subnetwork_connections (nano::tcp_endpoint const & endpoint_a); - bool max_ip_or_subnetwork_connections (nano::tcp_endpoint const & endpoint_a); + bool max_ip_connections (nano::tcp_endpoint const & endpoint); + bool max_subnetwork_connections (nano::tcp_endpoint const & endpoint); + bool max_ip_or_subnetwork_connections (nano::tcp_endpoint const & endpoint); // Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt bool track_reachout (nano::endpoint const &); void purge (std::chrono::steady_clock::time_point cutoff_deadline); - void list (std::deque> &, uint8_t = 0, bool = true); + std::deque> list (uint8_t minimum_version = 0) const; + std::unordered_set> random_set (std::size_t max_count, uint8_t minimum_version = 0) const; void keepalive (); std::optional sample_keepalive (); diff --git a/nano/node/transport/tcp_socket.cpp b/nano/node/transport/tcp_socket.cpp index 7258cbbc9..afc040608 100644 --- a/nano/node/transport/tcp_socket.cpp +++ b/nano/node/transport/tcp_socket.cpp @@ -140,7 +140,7 @@ void nano::transport::tcp_socket::async_read (std::shared_ptr callback_a, nano::transport::traffic_type traffic_type) +void nano::transport::tcp_socket::async_write (nano::shared_const_buffer const & buffer_a, std::function callback_a) { auto node_l = node_w.lock (); if (!node_l) @@ -159,7 +159,7 @@ void nano::transport::tcp_socket::async_write (nano::shared_const_buffer const & return; } - bool queued = send_queue.insert (buffer_a, callback_a, traffic_type); + bool queued = send_queue.insert (buffer_a, callback_a, traffic_type::generic); if (!queued) { if (callback_a) @@ -234,14 +234,14 @@ void nano::transport::tcp_socket::write_queued_messages () })); } -bool nano::transport::tcp_socket::max (nano::transport::traffic_type traffic_type) const +bool nano::transport::tcp_socket::max () const { - return send_queue.size (traffic_type) >= max_queue_size; + return send_queue.size (traffic_type::generic) >= max_queue_size; } -bool nano::transport::tcp_socket::full (nano::transport::traffic_type traffic_type) const +bool nano::transport::tcp_socket::full () const { - return send_queue.size (traffic_type) >= 2 * max_queue_size; + return send_queue.size (traffic_type::generic) >= 2 * max_queue_size; } /** Call set_timeout with default_timeout as parameter */ @@ -468,10 +468,6 @@ auto nano::transport::socket_queue::pop () -> std::optional { return item; } - if (auto item = try_pop (nano::transport::traffic_type::bootstrap)) - { - return item; - } return std::nullopt; } diff --git a/nano/node/transport/tcp_socket.hpp b/nano/node/transport/tcp_socket.hpp index c09448533..acf79c5b2 100644 --- a/nano/node/transport/tcp_socket.hpp +++ b/nano/node/transport/tcp_socket.hpp @@ -97,8 +97,7 @@ public: void async_write ( nano::shared_const_buffer const &, - std::function callback = {}, - traffic_type = traffic_type::generic); + std::function callback = nullptr); boost::asio::ip::tcp::endpoint remote_endpoint () const; boost::asio::ip::tcp::endpoint local_endpoint () const; @@ -110,8 +109,8 @@ public: std::chrono::seconds get_default_timeout_value () const; void set_timeout (std::chrono::seconds); - bool max (nano::transport::traffic_type = traffic_type::generic) const; - bool full (nano::transport::traffic_type = traffic_type::generic) const; + bool max () const; + bool full () const; nano::transport::socket_type type () const { diff --git a/nano/node/transport/traffic_type.hpp b/nano/node/transport/traffic_type.hpp index 89128b119..f0f451b3a 100644 --- a/nano/node/transport/traffic_type.hpp +++ b/nano/node/transport/traffic_type.hpp @@ -7,13 +7,22 @@ namespace nano::transport { -/** - * Used for message prioritization and bandwidth limits - */ enum class traffic_type { generic, - bootstrap, // Ascending bootstrap (asc_pull_ack, asc_pull_req) traffic + bootstrap_server, + bootstrap_requests, + block_broadcast, + block_broadcast_initial, + block_broadcast_rpc, + confirmation_requests, + keepalive, + vote, + vote_rebroadcast, + vote_reply, + rep_crawler, + telemetry, + test, }; std::string_view to_string (traffic_type); diff --git a/nano/node/vote_generator.cpp b/nano/node/vote_generator.cpp index 2c0e4292d..c1651a69c 100644 --- a/nano/node/vote_generator.cpp +++ b/nano/node/vote_generator.cpp @@ -173,12 +173,6 @@ std::size_t nano::vote_generator::generate (std::vector const &, std::shared_ptr const &)> action_a) -{ - release_assert (!reply_action); - reply_action = action_a; -} - void nano::vote_generator::broadcast (nano::unique_lock & lock_a) { debug_assert (lock_a.owns_lock ()); @@ -218,6 +212,10 @@ void nano::vote_generator::broadcast (nano::unique_lock & lock_a) void nano::vote_generator::reply (nano::unique_lock & lock_a, request_t && request_a) { + if (request_a.second->max (nano::transport::traffic_type::vote_reply)) + { + return; + } lock_a.unlock (); auto i (request_a.first.cbegin ()); auto n (request_a.first.cend ()); @@ -246,9 +244,11 @@ void nano::vote_generator::reply (nano::unique_lock & lock_a, reque if (!hashes.empty ()) { stats.add (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes, stat::dir::in, hashes.size ()); - vote (hashes, roots, [this, &channel = request_a.second] (std::shared_ptr const & vote_a) { - this->reply_action (vote_a, channel); - this->stats.inc (nano::stat::type::requests, nano::stat::detail::requests_generated_votes, stat::dir::in); + + vote (hashes, roots, [this, channel = request_a.second] (std::shared_ptr const & vote_a) { + nano::confirm_ack confirm{ config.network_params.network, vote_a }; + channel->send (confirm, nano::transport::traffic_type::vote_reply); + stats.inc (nano::stat::type::requests, nano::stat::detail::requests_generated_votes, stat::dir::in); }); } } diff --git a/nano/node/vote_generator.hpp b/nano/node/vote_generator.hpp index 76fc0c583..dbec5fb25 100644 --- a/nano/node/vote_generator.hpp +++ b/nano/node/vote_generator.hpp @@ -40,7 +40,6 @@ public: void add (nano::root const &, nano::block_hash const &); /** Queue blocks for vote generation, returning the number of successful candidates.*/ std::size_t generate (std::vector> const & blocks_a, std::shared_ptr const & channel_a); - void set_reply_action (std::function const &, std::shared_ptr const &)>); void start (); void stop (); @@ -59,9 +58,6 @@ private: bool should_vote (transaction_variant_t const &, nano::root const &, nano::block_hash const &) const; bool broadcast_predicate () const; -private: - std::function const &, std::shared_ptr &)> reply_action; // must be set only during initialization by using set_reply_action - private: // Dependencies nano::node_config const & config; nano::node & node; diff --git a/nano/node/wallet.cpp b/nano/node/wallet.cpp index c2c2101c7..9d2fa9d74 100644 --- a/nano/node/wallet.cpp +++ b/nano/node/wallet.cpp @@ -970,7 +970,7 @@ std::shared_ptr nano::wallet::send_action (nano::account const & so if (block != nullptr) { cached_block = true; - wallets.node.network.flood_block (block, nano::transport::buffer_drop_policy::no_limiter_drop); + wallets.node.network.flood_block (block, nano::transport::traffic_type::block_broadcast_initial); } } else if (status != MDB_NOTFOUND) diff --git a/nano/qt/qt.cpp b/nano/qt/qt.cpp index 236f0befe..84a215c45 100644 --- a/nano/qt/qt.cpp +++ b/nano/qt/qt.cpp @@ -739,7 +739,7 @@ void nano_qt::block_viewer::rebroadcast_action (nano::block_hash const & hash_a) auto block (wallet.node.ledger.any.block_get (transaction, hash_a)); if (block != nullptr) { - wallet.node.network.flood_block (block); + wallet.node.network.flood_block (block, nano::transport::traffic_type::block_broadcast_initial); auto successor = wallet.node.ledger.any.block_successor (transaction, hash_a); if (successor) { diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index 75bf0ea2f..a96e12b47 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -300,7 +300,7 @@ TEST (node, fork_storm) auto open_result (node_i->process (open)); ASSERT_EQ (nano::block_status::progress, open_result); auto transaction (node_i->store.tx_begin_read ()); - node_i->network.flood_block (open); + node_i->network.flood_block (open, nano::transport::traffic_type::test); } } auto again (true);