Node traffic prioritization
This commit is contained in:
parent
0f8bbf80a9
commit
40f53a94e6
41 changed files with 269 additions and 316 deletions
|
|
@ -970,7 +970,7 @@ TEST (active_elections, fork_replacement_tally)
|
||||||
node_config.peering_port = system.get_available_port ();
|
node_config.peering_port = system.get_available_port ();
|
||||||
auto & node2 (*system.add_node (node_config));
|
auto & node2 (*system.add_node (node_config));
|
||||||
node1.network.filter.clear ();
|
node1.network.filter.clear ();
|
||||||
node2.network.flood_block (send_last);
|
node2.network.flood_block (send_last, nano::transport::traffic_type::test);
|
||||||
ASSERT_TIMELY (3s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 0);
|
ASSERT_TIMELY (3s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 0);
|
||||||
|
|
||||||
// Correct block without votes is ignored
|
// Correct block without votes is ignored
|
||||||
|
|
@ -984,7 +984,7 @@ TEST (active_elections, fork_replacement_tally)
|
||||||
// ensure vote arrives before the block
|
// ensure vote arrives before the block
|
||||||
ASSERT_TIMELY_EQ (5s, 1, node1.vote_cache.find (send_last->hash ()).size ());
|
ASSERT_TIMELY_EQ (5s, 1, node1.vote_cache.find (send_last->hash ()).size ());
|
||||||
node1.network.filter.clear ();
|
node1.network.filter.clear ();
|
||||||
node2.network.flood_block (send_last);
|
node2.network.flood_block (send_last, nano::transport::traffic_type::test);
|
||||||
ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 1);
|
ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 1);
|
||||||
|
|
||||||
// the send_last block should replace one of the existing block of the election because it has higher vote weight
|
// the send_last block should replace one of the existing block of the election because it has higher vote weight
|
||||||
|
|
|
||||||
|
|
@ -195,7 +195,7 @@ TEST (network, send_discarded_publish)
|
||||||
.build ();
|
.build ();
|
||||||
{
|
{
|
||||||
auto transaction = node1.ledger.tx_begin_read ();
|
auto transaction = node1.ledger.tx_begin_read ();
|
||||||
node1.network.flood_block (block);
|
node1.network.flood_block (block, nano::transport::traffic_type::test);
|
||||||
ASSERT_EQ (nano::dev::genesis->hash (), node1.ledger.any.account_head (transaction, nano::dev::genesis_key.pub));
|
ASSERT_EQ (nano::dev::genesis->hash (), node1.ledger.any.account_head (transaction, nano::dev::genesis_key.pub));
|
||||||
ASSERT_EQ (nano::dev::genesis->hash (), node2.latest (nano::dev::genesis_key.pub));
|
ASSERT_EQ (nano::dev::genesis->hash (), node2.latest (nano::dev::genesis_key.pub));
|
||||||
}
|
}
|
||||||
|
|
@ -221,7 +221,7 @@ TEST (network, send_invalid_publish)
|
||||||
.build ();
|
.build ();
|
||||||
{
|
{
|
||||||
auto transaction = node1.ledger.tx_begin_read ();
|
auto transaction = node1.ledger.tx_begin_read ();
|
||||||
node1.network.flood_block (block);
|
node1.network.flood_block (block, nano::transport::traffic_type::test);
|
||||||
ASSERT_EQ (nano::dev::genesis->hash (), node1.ledger.any.account_head (transaction, nano::dev::genesis_key.pub));
|
ASSERT_EQ (nano::dev::genesis->hash (), node1.ledger.any.account_head (transaction, nano::dev::genesis_key.pub));
|
||||||
ASSERT_EQ (nano::dev::genesis->hash (), node2.latest (nano::dev::genesis_key.pub));
|
ASSERT_EQ (nano::dev::genesis->hash (), node2.latest (nano::dev::genesis_key.pub));
|
||||||
}
|
}
|
||||||
|
|
@ -306,7 +306,7 @@ TEST (network, send_insufficient_work)
|
||||||
nano::publish publish1{ nano::dev::network_params.network, block1 };
|
nano::publish publish1{ nano::dev::network_params.network, block1 };
|
||||||
auto tcp_channel (node1.network.tcp_channels.find_node_id (node2.get_node_id ()));
|
auto tcp_channel (node1.network.tcp_channels.find_node_id (node2.get_node_id ()));
|
||||||
ASSERT_NE (nullptr, tcp_channel);
|
ASSERT_NE (nullptr, tcp_channel);
|
||||||
tcp_channel->send (publish1, [] (boost::system::error_code const & ec, size_t size) {});
|
tcp_channel->send (publish1, nano::transport::traffic_type::test);
|
||||||
ASSERT_EQ (0, node1.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work));
|
ASSERT_EQ (0, node1.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work));
|
||||||
ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work) != 0);
|
ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work) != 0);
|
||||||
ASSERT_EQ (1, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work));
|
ASSERT_EQ (1, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work));
|
||||||
|
|
@ -320,7 +320,7 @@ TEST (network, send_insufficient_work)
|
||||||
.work (system.work_generate_limited (block1->hash (), node1.network_params.work.epoch_2_receive, node1.network_params.work.epoch_1 - 1))
|
.work (system.work_generate_limited (block1->hash (), node1.network_params.work.epoch_2_receive, node1.network_params.work.epoch_1 - 1))
|
||||||
.build ();
|
.build ();
|
||||||
nano::publish publish2{ nano::dev::network_params.network, block2 };
|
nano::publish publish2{ nano::dev::network_params.network, block2 };
|
||||||
tcp_channel->send (publish2, [] (boost::system::error_code const & ec, size_t size) {});
|
tcp_channel->send (publish2, nano::transport::traffic_type::test);
|
||||||
ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work) != 1);
|
ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work) != 1);
|
||||||
ASSERT_EQ (2, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work));
|
ASSERT_EQ (2, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work));
|
||||||
// Legacy block work epoch_1
|
// Legacy block work epoch_1
|
||||||
|
|
@ -333,7 +333,7 @@ TEST (network, send_insufficient_work)
|
||||||
.work (*system.work.generate (block2->hash (), node1.network_params.work.epoch_2))
|
.work (*system.work.generate (block2->hash (), node1.network_params.work.epoch_2))
|
||||||
.build ();
|
.build ();
|
||||||
nano::publish publish3{ nano::dev::network_params.network, block3 };
|
nano::publish publish3{ nano::dev::network_params.network, block3 };
|
||||||
tcp_channel->send (publish3, [] (boost::system::error_code const & ec, size_t size) {});
|
tcp_channel->send (publish3, nano::transport::traffic_type::test);
|
||||||
ASSERT_EQ (0, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in));
|
ASSERT_EQ (0, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in));
|
||||||
ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) != 0);
|
ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) != 0);
|
||||||
ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in));
|
ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in));
|
||||||
|
|
@ -349,7 +349,7 @@ TEST (network, send_insufficient_work)
|
||||||
.work (system.work_generate_limited (block1->hash (), node1.network_params.work.epoch_2_receive, node1.network_params.work.epoch_1 - 1))
|
.work (system.work_generate_limited (block1->hash (), node1.network_params.work.epoch_2_receive, node1.network_params.work.epoch_1 - 1))
|
||||||
.build ();
|
.build ();
|
||||||
nano::publish publish4{ nano::dev::network_params.network, block4 };
|
nano::publish publish4{ nano::dev::network_params.network, block4 };
|
||||||
tcp_channel->send (publish4, [] (boost::system::error_code const & ec, size_t size) {});
|
tcp_channel->send (publish4, nano::transport::traffic_type::test);
|
||||||
ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) != 0);
|
ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) != 0);
|
||||||
ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in));
|
ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in));
|
||||||
ASSERT_EQ (2, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work));
|
ASSERT_EQ (2, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work));
|
||||||
|
|
@ -632,9 +632,9 @@ TEST (network, duplicate_detection)
|
||||||
ASSERT_NE (nullptr, tcp_channel);
|
ASSERT_NE (nullptr, tcp_channel);
|
||||||
|
|
||||||
ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message));
|
ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message));
|
||||||
tcp_channel->send (publish);
|
tcp_channel->send (publish, nano::transport::traffic_type::test);
|
||||||
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message), 0);
|
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message), 0);
|
||||||
tcp_channel->send (publish);
|
tcp_channel->send (publish, nano::transport::traffic_type::test);
|
||||||
ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message), 1);
|
ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -681,9 +681,9 @@ TEST (network, duplicate_vote_detection)
|
||||||
ASSERT_NE (nullptr, tcp_channel);
|
ASSERT_NE (nullptr, tcp_channel);
|
||||||
|
|
||||||
ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message));
|
ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message));
|
||||||
tcp_channel->send (message);
|
tcp_channel->send (message, nano::transport::traffic_type::test);
|
||||||
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0);
|
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0);
|
||||||
tcp_channel->send (message);
|
tcp_channel->send (message, nano::transport::traffic_type::test);
|
||||||
ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 1);
|
ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -711,12 +711,12 @@ TEST (network, duplicate_revert_vote)
|
||||||
ASSERT_NE (nullptr, tcp_channel);
|
ASSERT_NE (nullptr, tcp_channel);
|
||||||
|
|
||||||
// First vote should be processed
|
// First vote should be processed
|
||||||
tcp_channel->send (message1);
|
tcp_channel->send (message1, nano::transport::traffic_type::test);
|
||||||
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0);
|
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0);
|
||||||
ASSERT_TIMELY (5s, node1.network.filter.check (bytes1.data (), bytes1.size ()));
|
ASSERT_TIMELY (5s, node1.network.filter.check (bytes1.data (), bytes1.size ()));
|
||||||
|
|
||||||
// Second vote should get dropped from processor queue
|
// Second vote should get dropped from processor queue
|
||||||
tcp_channel->send (message2);
|
tcp_channel->send (message2, nano::transport::traffic_type::test);
|
||||||
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0);
|
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0);
|
||||||
// And the filter should not have it
|
// And the filter should not have it
|
||||||
WAIT (500ms); // Give the node time to process the vote
|
WAIT (500ms); // Give the node time to process the vote
|
||||||
|
|
@ -741,9 +741,9 @@ TEST (network, expire_duplicate_filter)
|
||||||
|
|
||||||
// Send a vote
|
// Send a vote
|
||||||
ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message));
|
ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message));
|
||||||
tcp_channel->send (message);
|
tcp_channel->send (message, nano::transport::traffic_type::test);
|
||||||
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0);
|
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0);
|
||||||
tcp_channel->send (message);
|
tcp_channel->send (message, nano::transport::traffic_type::test);
|
||||||
ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 1);
|
ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 1);
|
||||||
|
|
||||||
// The filter should expire the vote after some time
|
// The filter should expire the vote after some time
|
||||||
|
|
@ -767,18 +767,18 @@ TEST (network, DISABLED_bandwidth_limiter_4_messages)
|
||||||
// Send droppable messages
|
// Send droppable messages
|
||||||
for (auto i = 0; i < message_limit; i += 2) // number of channels
|
for (auto i = 0; i < message_limit; i += 2) // number of channels
|
||||||
{
|
{
|
||||||
channel1.send (message);
|
channel1.send (message, nano::transport::traffic_type::test);
|
||||||
channel2.send (message);
|
channel2.send (message, nano::transport::traffic_type::test);
|
||||||
}
|
}
|
||||||
// Only sent messages below limit, so we don't expect any drops
|
// Only sent messages below limit, so we don't expect any drops
|
||||||
ASSERT_TIMELY_EQ (1s, 0, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
|
ASSERT_TIMELY_EQ (1s, 0, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
|
||||||
|
|
||||||
// Send droppable message; drop stats should increase by one now
|
// Send droppable message; drop stats should increase by one now
|
||||||
channel1.send (message);
|
channel1.send (message, nano::transport::traffic_type::test);
|
||||||
ASSERT_TIMELY_EQ (1s, 1, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
|
ASSERT_TIMELY_EQ (1s, 1, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
|
||||||
|
|
||||||
// Send non-droppable message, i.e. drop stats should not increase
|
// Send non-droppable message, i.e. drop stats should not increase
|
||||||
channel2.send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop);
|
channel2.send (message, nano::transport::traffic_type::test);
|
||||||
ASSERT_TIMELY_EQ (1s, 1, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
|
ASSERT_TIMELY_EQ (1s, 1, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -795,10 +795,10 @@ TEST (network, DISABLED_bandwidth_limiter_2_messages)
|
||||||
nano::transport::inproc::channel channel1{ node, node };
|
nano::transport::inproc::channel channel1{ node, node };
|
||||||
nano::transport::inproc::channel channel2{ node, node };
|
nano::transport::inproc::channel channel2{ node, node };
|
||||||
// change the bandwidth settings, 2 packets will be dropped
|
// change the bandwidth settings, 2 packets will be dropped
|
||||||
channel1.send (message);
|
channel1.send (message, nano::transport::traffic_type::test);
|
||||||
channel2.send (message);
|
channel2.send (message, nano::transport::traffic_type::test);
|
||||||
channel1.send (message);
|
channel1.send (message, nano::transport::traffic_type::test);
|
||||||
channel2.send (message);
|
channel2.send (message, nano::transport::traffic_type::test);
|
||||||
ASSERT_TIMELY_EQ (1s, 2, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
|
ASSERT_TIMELY_EQ (1s, 2, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -815,10 +815,10 @@ TEST (network, bandwidth_limiter_with_burst)
|
||||||
nano::transport::inproc::channel channel1{ node, node };
|
nano::transport::inproc::channel channel1{ node, node };
|
||||||
nano::transport::inproc::channel channel2{ node, node };
|
nano::transport::inproc::channel channel2{ node, node };
|
||||||
// change the bandwidth settings, no packet will be dropped
|
// change the bandwidth settings, no packet will be dropped
|
||||||
channel1.send (message);
|
channel1.send (message, nano::transport::traffic_type::test);
|
||||||
channel2.send (message);
|
channel2.send (message, nano::transport::traffic_type::test);
|
||||||
channel1.send (message);
|
channel1.send (message, nano::transport::traffic_type::test);
|
||||||
channel2.send (message);
|
channel2.send (message, nano::transport::traffic_type::test);
|
||||||
ASSERT_TIMELY_EQ (1s, 0, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
|
ASSERT_TIMELY_EQ (1s, 0, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -962,7 +962,7 @@ TEST (network, filter_invalid_network_bytes)
|
||||||
// send a keepalive, from node2 to node1, with the wrong network bytes
|
// send a keepalive, from node2 to node1, with the wrong network bytes
|
||||||
nano::keepalive keepalive{ nano::dev::network_params.network };
|
nano::keepalive keepalive{ nano::dev::network_params.network };
|
||||||
const_cast<nano::networks &> (keepalive.header.network) = nano::networks::invalid;
|
const_cast<nano::networks &> (keepalive.header.network) = nano::networks::invalid;
|
||||||
channel->send (keepalive);
|
channel->send (keepalive, nano::transport::traffic_type::test);
|
||||||
|
|
||||||
ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::error, nano::stat::detail::invalid_network));
|
ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::error, nano::stat::detail::invalid_network));
|
||||||
}
|
}
|
||||||
|
|
@ -981,7 +981,7 @@ TEST (network, filter_invalid_version_using)
|
||||||
// send a keepalive, from node2 to node1, with the wrong version_using
|
// send a keepalive, from node2 to node1, with the wrong version_using
|
||||||
nano::keepalive keepalive{ nano::dev::network_params.network };
|
nano::keepalive keepalive{ nano::dev::network_params.network };
|
||||||
const_cast<uint8_t &> (keepalive.header.version_using) = nano::dev::network_params.network.protocol_version_min - 1;
|
const_cast<uint8_t &> (keepalive.header.version_using) = nano::dev::network_params.network.protocol_version_min - 1;
|
||||||
channel->send (keepalive);
|
channel->send (keepalive, nano::transport::traffic_type::test);
|
||||||
|
|
||||||
ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::error, nano::stat::detail::outdated_version));
|
ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::error, nano::stat::detail::outdated_version));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -473,7 +473,7 @@ TEST (node, confirm_locked)
|
||||||
.sign (nano::keypair ().prv, 0)
|
.sign (nano::keypair ().prv, 0)
|
||||||
.work (0)
|
.work (0)
|
||||||
.build ();
|
.build ();
|
||||||
system.nodes[0]->network.flood_block (block);
|
system.nodes[0]->network.flood_block (block, nano::transport::traffic_type::test);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST (node_config, random_rep)
|
TEST (node_config, random_rep)
|
||||||
|
|
@ -1007,7 +1007,7 @@ TEST (node, fork_no_vote_quorum)
|
||||||
nano::confirm_ack confirm{ nano::dev::network_params.network, vote };
|
nano::confirm_ack confirm{ nano::dev::network_params.network, vote };
|
||||||
auto channel = node2.network.find_node_id (node3.node_id.pub);
|
auto channel = node2.network.find_node_id (node3.node_id.pub);
|
||||||
ASSERT_NE (nullptr, channel);
|
ASSERT_NE (nullptr, channel);
|
||||||
channel->send (confirm);
|
channel->send (confirm, nano::transport::traffic_type::test);
|
||||||
ASSERT_TIMELY (10s, node3.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::in) >= 3);
|
ASSERT_TIMELY (10s, node3.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::in) >= 3);
|
||||||
ASSERT_EQ (node1.latest (nano::dev::genesis_key.pub), send1->hash ());
|
ASSERT_EQ (node1.latest (nano::dev::genesis_key.pub), send1->hash ());
|
||||||
ASSERT_EQ (node2.latest (nano::dev::genesis_key.pub), send1->hash ());
|
ASSERT_EQ (node2.latest (nano::dev::genesis_key.pub), send1->hash ());
|
||||||
|
|
|
||||||
|
|
@ -255,7 +255,7 @@ TEST (peer_container, depeer_on_outdated_version)
|
||||||
nano::keepalive keepalive{ nano::dev::network_params.network };
|
nano::keepalive keepalive{ nano::dev::network_params.network };
|
||||||
const_cast<uint8_t &> (keepalive.header.version_using) = nano::dev::network_params.network.protocol_version_min - 1;
|
const_cast<uint8_t &> (keepalive.header.version_using) = nano::dev::network_params.network.protocol_version_min - 1;
|
||||||
ASSERT_TIMELY (5s, channel->alive ());
|
ASSERT_TIMELY (5s, channel->alive ());
|
||||||
channel->send (keepalive);
|
channel->send (keepalive, nano::transport::traffic_type::test);
|
||||||
|
|
||||||
ASSERT_TIMELY (5s, !channel->alive ());
|
ASSERT_TIMELY (5s, !channel->alive ());
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -323,7 +323,7 @@ TEST (rep_crawler, ignore_rebroadcasted)
|
||||||
|
|
||||||
auto tick = [&] () {
|
auto tick = [&] () {
|
||||||
nano::confirm_ack msg{ nano::dev::network_params.network, vote, /* rebroadcasted */ true };
|
nano::confirm_ack msg{ nano::dev::network_params.network, vote, /* rebroadcasted */ true };
|
||||||
channel2to1->send (msg, nullptr, nano::transport::buffer_drop_policy::no_socket_drop);
|
channel2to1->send (msg, nano::transport::traffic_type::test);
|
||||||
return false;
|
return false;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -211,7 +211,7 @@ TEST (system, transport_basic)
|
||||||
nano::transport::inproc::channel channel{ node0, node1 };
|
nano::transport::inproc::channel channel{ node0, node1 };
|
||||||
// Send a keepalive message since they are easy to construct
|
// Send a keepalive message since they are easy to construct
|
||||||
nano::keepalive junk{ nano::dev::network_params.network };
|
nano::keepalive junk{ nano::dev::network_params.network };
|
||||||
channel.send (junk);
|
channel.send (junk, nano::transport::traffic_type::test);
|
||||||
// Ensure the keepalive has been reecived on the target.
|
// Ensure the keepalive has been reecived on the target.
|
||||||
ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in) > 0);
|
ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in) > 0);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -275,7 +275,7 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake)
|
||||||
auto channel = std::make_shared<nano::transport::tcp_channel> (*node0, socket);
|
auto channel = std::make_shared<nano::transport::tcp_channel> (*node0, socket);
|
||||||
socket->async_connect (node0->tcp_listener.endpoint (), [&node_id_handshake, channel] (boost::system::error_code const & ec) {
|
socket->async_connect (node0->tcp_listener.endpoint (), [&node_id_handshake, channel] (boost::system::error_code const & ec) {
|
||||||
ASSERT_FALSE (ec);
|
ASSERT_FALSE (ec);
|
||||||
channel->send (node_id_handshake, [] (boost::system::error_code const & ec, size_t size_a) {
|
channel->send (node_id_handshake, nano::transport::traffic_type::test, [] (boost::system::error_code const & ec, size_t size_a) {
|
||||||
ASSERT_FALSE (ec);
|
ASSERT_FALSE (ec);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -143,7 +143,7 @@ TEST (telemetry, dos_tcp)
|
||||||
nano::telemetry_req message{ nano::dev::network_params.network };
|
nano::telemetry_req message{ nano::dev::network_params.network };
|
||||||
auto channel = node_client->network.tcp_channels.find_node_id (node_server->get_node_id ());
|
auto channel = node_client->network.tcp_channels.find_node_id (node_server->get_node_id ());
|
||||||
ASSERT_NE (nullptr, channel);
|
ASSERT_NE (nullptr, channel);
|
||||||
channel->send (message, [] (boost::system::error_code const & ec, size_t size_a) {
|
channel->send (message, nano::transport::traffic_type::test, [] (boost::system::error_code const & ec, size_t size_a) {
|
||||||
ASSERT_FALSE (ec);
|
ASSERT_FALSE (ec);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
@ -152,7 +152,7 @@ TEST (telemetry, dos_tcp)
|
||||||
auto orig = std::chrono::steady_clock::now ();
|
auto orig = std::chrono::steady_clock::now ();
|
||||||
for (int i = 0; i < 10; ++i)
|
for (int i = 0; i < 10; ++i)
|
||||||
{
|
{
|
||||||
channel->send (message, [] (boost::system::error_code const & ec, size_t size_a) {
|
channel->send (message, nano::transport::traffic_type::test, [] (boost::system::error_code const & ec, size_t size_a) {
|
||||||
ASSERT_FALSE (ec);
|
ASSERT_FALSE (ec);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
@ -165,7 +165,7 @@ TEST (telemetry, dos_tcp)
|
||||||
// Now spam messages waiting for it to be processed
|
// Now spam messages waiting for it to be processed
|
||||||
while (node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in) == 1)
|
while (node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in) == 1)
|
||||||
{
|
{
|
||||||
channel->send (message);
|
channel->send (message, nano::transport::traffic_type::test);
|
||||||
ASSERT_NO_ERROR (system.poll ());
|
ASSERT_NO_ERROR (system.poll ());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -214,7 +214,7 @@ TEST (telemetry, max_possible_size)
|
||||||
|
|
||||||
auto channel = node_client->network.tcp_channels.find_node_id (node_server->get_node_id ());
|
auto channel = node_client->network.tcp_channels.find_node_id (node_server->get_node_id ());
|
||||||
ASSERT_NE (nullptr, channel);
|
ASSERT_NE (nullptr, channel);
|
||||||
channel->send (message, [] (boost::system::error_code const & ec, size_t size_a) {
|
channel->send (message, nano::transport::traffic_type::test, [] (boost::system::error_code const & ec, size_t size_a) {
|
||||||
ASSERT_FALSE (ec);
|
ASSERT_FALSE (ec);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -47,7 +47,7 @@ enum class type
|
||||||
confirmation_height,
|
confirmation_height,
|
||||||
confirmation_observer,
|
confirmation_observer,
|
||||||
confirming_set,
|
confirming_set,
|
||||||
drop,
|
drop, // TODO: Rename to message_drop
|
||||||
aggregator,
|
aggregator,
|
||||||
requests,
|
requests,
|
||||||
request_aggregator,
|
request_aggregator,
|
||||||
|
|
@ -69,6 +69,7 @@ enum class type
|
||||||
bootstrap_verify_frontiers,
|
bootstrap_verify_frontiers,
|
||||||
bootstrap_process,
|
bootstrap_process,
|
||||||
bootstrap_request,
|
bootstrap_request,
|
||||||
|
bootstrap_request_ec,
|
||||||
bootstrap_request_blocks,
|
bootstrap_request_blocks,
|
||||||
bootstrap_reply,
|
bootstrap_reply,
|
||||||
bootstrap_next,
|
bootstrap_next,
|
||||||
|
|
@ -80,6 +81,8 @@ enum class type
|
||||||
bootstrap_server_request,
|
bootstrap_server_request,
|
||||||
bootstrap_server_overfill,
|
bootstrap_server_overfill,
|
||||||
bootstrap_server_response,
|
bootstrap_server_response,
|
||||||
|
bootstrap_server_send,
|
||||||
|
bootstrap_server_ec,
|
||||||
active,
|
active,
|
||||||
active_elections,
|
active_elections,
|
||||||
active_elections_started,
|
active_elections_started,
|
||||||
|
|
@ -98,6 +101,7 @@ enum class type
|
||||||
optimistic_scheduler,
|
optimistic_scheduler,
|
||||||
handshake,
|
handshake,
|
||||||
rep_crawler,
|
rep_crawler,
|
||||||
|
rep_crawler_ec,
|
||||||
local_block_broadcaster,
|
local_block_broadcaster,
|
||||||
rep_tiers,
|
rep_tiers,
|
||||||
syn_cookies,
|
syn_cookies,
|
||||||
|
|
@ -313,8 +317,18 @@ enum class detail
|
||||||
reachout_cached,
|
reachout_cached,
|
||||||
connected,
|
connected,
|
||||||
|
|
||||||
// traffic
|
// traffic type
|
||||||
generic,
|
generic,
|
||||||
|
bootstrap_server,
|
||||||
|
bootstrap_requests,
|
||||||
|
block_broadcast,
|
||||||
|
block_broadcast_initial,
|
||||||
|
block_broadcast_rpc,
|
||||||
|
confirmation_requests,
|
||||||
|
vote_rebroadcast,
|
||||||
|
vote_reply,
|
||||||
|
rep_crawler,
|
||||||
|
telemetry,
|
||||||
|
|
||||||
// tcp
|
// tcp
|
||||||
tcp_silent_connection_drop,
|
tcp_silent_connection_drop,
|
||||||
|
|
|
||||||
|
|
@ -17,16 +17,11 @@ nano::rate_limiter & nano::bandwidth_limiter::select_limiter (nano::transport::t
|
||||||
{
|
{
|
||||||
switch (type)
|
switch (type)
|
||||||
{
|
{
|
||||||
case nano::transport::traffic_type::bootstrap:
|
case nano::transport::traffic_type::bootstrap_server:
|
||||||
return limiter_bootstrap;
|
return limiter_bootstrap;
|
||||||
case nano::transport::traffic_type::generic:
|
|
||||||
return limiter_generic;
|
|
||||||
break;
|
|
||||||
default:
|
default:
|
||||||
debug_assert (false, "missing traffic type");
|
return limiter_generic;
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
return limiter_generic;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool nano::bandwidth_limiter::should_pass (std::size_t buffer_size, nano::transport::traffic_type type)
|
bool nano::bandwidth_limiter::should_pass (std::size_t buffer_size, nano::transport::traffic_type type)
|
||||||
|
|
|
||||||
|
|
@ -104,7 +104,7 @@ bool nano::bootstrap_server::verify (const nano::asc_pull_req & message) const
|
||||||
return std::visit (verify_visitor{}, message.payload);
|
return std::visit (verify_visitor{}, message.payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool nano::bootstrap_server::request (nano::asc_pull_req const & message, std::shared_ptr<nano::transport::channel> channel)
|
bool nano::bootstrap_server::request (nano::asc_pull_req const & message, std::shared_ptr<nano::transport::channel> const & channel)
|
||||||
{
|
{
|
||||||
if (!verify (message))
|
if (!verify (message))
|
||||||
{
|
{
|
||||||
|
|
@ -113,8 +113,7 @@ bool nano::bootstrap_server::request (nano::asc_pull_req const & message, std::s
|
||||||
}
|
}
|
||||||
|
|
||||||
// If channel is full our response will be dropped anyway, so filter that early
|
// If channel is full our response will be dropped anyway, so filter that early
|
||||||
// TODO: Add per channel limits (this ideally should be done on the channel message processing side)
|
if (channel->max (nano::transport::traffic_type::bootstrap_server))
|
||||||
if (channel->max (nano::transport::traffic_type::bootstrap))
|
|
||||||
{
|
{
|
||||||
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::channel_full, nano::stat::dir::in);
|
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::channel_full, nano::stat::dir::in);
|
||||||
return false;
|
return false;
|
||||||
|
|
@ -171,13 +170,9 @@ void nano::bootstrap_server::respond (nano::asc_pull_ack & response, std::shared
|
||||||
on_response.notify (response, channel);
|
on_response.notify (response, channel);
|
||||||
|
|
||||||
channel->send (
|
channel->send (
|
||||||
response, [this] (auto & ec, auto size) {
|
response, nano::transport::traffic_type::bootstrap_server, [this] (auto & ec, auto size) {
|
||||||
if (ec)
|
stats.inc (nano::stat::type::bootstrap_server_ec, to_stat_detail (ec), nano::stat::dir::out);
|
||||||
{
|
});
|
||||||
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::write_error, nano::stat::dir::out);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::bootstrap_server::run ()
|
void nano::bootstrap_server::run ()
|
||||||
|
|
@ -220,7 +215,7 @@ void nano::bootstrap_server::run_batch (nano::unique_lock<nano::mutex> & lock)
|
||||||
|
|
||||||
transaction.refresh_if_needed ();
|
transaction.refresh_if_needed ();
|
||||||
|
|
||||||
if (!channel->max (nano::transport::traffic_type::bootstrap))
|
if (!channel->max (nano::transport::traffic_type::bootstrap_server))
|
||||||
{
|
{
|
||||||
auto response = process (transaction, request);
|
auto response = process (transaction, request);
|
||||||
respond (response, channel);
|
respond (response, channel);
|
||||||
|
|
|
||||||
|
|
@ -42,7 +42,7 @@ public:
|
||||||
* Process `asc_pull_req` message coming from network.
|
* Process `asc_pull_req` message coming from network.
|
||||||
* Reply will be sent back over passed in `channel`
|
* Reply will be sent back over passed in `channel`
|
||||||
*/
|
*/
|
||||||
bool request (nano::asc_pull_req const & message, std::shared_ptr<nano::transport::channel> channel);
|
bool request (nano::asc_pull_req const & message, std::shared_ptr<nano::transport::channel> const & channel);
|
||||||
|
|
||||||
public: // Events
|
public: // Events
|
||||||
nano::observer_set<nano::asc_pull_ack const &, std::shared_ptr<nano::transport::channel> const &> on_response;
|
nano::observer_set<nano::asc_pull_ack const &, std::shared_ptr<nano::transport::channel> const &> on_response;
|
||||||
|
|
|
||||||
|
|
@ -201,14 +201,12 @@ bool nano::bootstrap_service::send (std::shared_ptr<nano::transport::channel> co
|
||||||
|
|
||||||
request.update_header ();
|
request.update_header ();
|
||||||
|
|
||||||
stats.inc (nano::stat::type::bootstrap, nano::stat::detail::request, nano::stat::dir::out);
|
bool sent = channel->send (
|
||||||
stats.inc (nano::stat::type::bootstrap_request, to_stat_detail (tag.type));
|
request, nano::transport::traffic_type::bootstrap_requests, [this, id = tag.id] (auto const & ec, auto size) {
|
||||||
|
|
||||||
channel->send (
|
|
||||||
request, [this, id = tag.id] (auto const & ec, auto size) {
|
|
||||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||||
if (auto it = tags.get<tag_id> ().find (id); it != tags.get<tag_id> ().end ())
|
if (auto it = tags.get<tag_id> ().find (id); it != tags.get<tag_id> ().end ())
|
||||||
{
|
{
|
||||||
|
stats.inc (nano::stat::type::bootstrap_request_ec, to_stat_detail (ec), nano::stat::dir::out);
|
||||||
if (!ec)
|
if (!ec)
|
||||||
{
|
{
|
||||||
stats.inc (nano::stat::type::bootstrap, nano::stat::detail::request_success, nano::stat::dir::out);
|
stats.inc (nano::stat::type::bootstrap, nano::stat::detail::request_success, nano::stat::dir::out);
|
||||||
|
|
@ -222,9 +220,19 @@ bool nano::bootstrap_service::send (std::shared_ptr<nano::transport::channel> co
|
||||||
stats.inc (nano::stat::type::bootstrap, nano::stat::detail::request_failed, nano::stat::dir::out);
|
stats.inc (nano::stat::type::bootstrap, nano::stat::detail::request_failed, nano::stat::dir::out);
|
||||||
tags.get<tag_id> ().erase (it);
|
tags.get<tag_id> ().erase (it);
|
||||||
}
|
}
|
||||||
} }, nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap);
|
} });
|
||||||
|
|
||||||
return true; // TODO: Return channel send result
|
if (sent)
|
||||||
|
{
|
||||||
|
stats.inc (nano::stat::type::bootstrap, nano::stat::detail::request);
|
||||||
|
stats.inc (nano::stat::type::bootstrap_request, to_stat_detail (tag.type));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
stats.inc (nano::stat::type::bootstrap, nano::stat::detail::request_failed);
|
||||||
|
}
|
||||||
|
|
||||||
|
return sent;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::size_t nano::bootstrap_service::priority_size () const
|
std::size_t nano::bootstrap_service::priority_size () const
|
||||||
|
|
|
||||||
|
|
@ -68,7 +68,7 @@ std::shared_ptr<nano::transport::channel> nano::bootstrap::peer_scoring::channel
|
||||||
{
|
{
|
||||||
for (auto const & channel : channels)
|
for (auto const & channel : channels)
|
||||||
{
|
{
|
||||||
if (!channel->max (nano::transport::traffic_type::bootstrap))
|
if (!channel->max (traffic_type))
|
||||||
{
|
{
|
||||||
if (!try_send_message (channel))
|
if (!try_send_message (channel))
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <nano/node/fwd.hpp>
|
#include <nano/node/fwd.hpp>
|
||||||
|
#include <nano/node/transport/traffic_type.hpp>
|
||||||
|
|
||||||
#include <boost/multi_index/hashed_index.hpp>
|
#include <boost/multi_index/hashed_index.hpp>
|
||||||
#include <boost/multi_index/member.hpp>
|
#include <boost/multi_index/member.hpp>
|
||||||
|
|
@ -19,6 +20,9 @@ namespace bootstrap
|
||||||
// Container for tracking and scoring peers with respect to bootstrapping
|
// Container for tracking and scoring peers with respect to bootstrapping
|
||||||
class peer_scoring
|
class peer_scoring
|
||||||
{
|
{
|
||||||
|
public:
|
||||||
|
static nano::transport::traffic_type constexpr traffic_type = nano::transport::traffic_type::bootstrap_requests;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
peer_scoring (bootstrap_config const &, nano::network_constants const &);
|
peer_scoring (bootstrap_config const &, nano::network_constants const &);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -44,12 +44,13 @@ bool nano::confirmation_solicitor::broadcast (nano::election const & election_a)
|
||||||
bool const different (exists && existing->second.hash != hash);
|
bool const different (exists && existing->second.hash != hash);
|
||||||
if (!exists || different)
|
if (!exists || different)
|
||||||
{
|
{
|
||||||
i->channel->send (winner);
|
i->channel->send (winner, nano::transport::traffic_type::block_broadcast);
|
||||||
count += different ? 0 : 1;
|
count += different ? 0 : 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Random flood for block propagation
|
// Random flood for block propagation
|
||||||
network.flood_message (winner, nano::transport::buffer_drop_policy::limiter, 0.5f);
|
// TODO: Avoid broadcasting to the same peers that were already broadcasted to
|
||||||
|
network.flood_message (winner, nano::transport::traffic_type::block_broadcast, 0.5f);
|
||||||
error = false;
|
error = false;
|
||||||
}
|
}
|
||||||
return error;
|
return error;
|
||||||
|
|
@ -71,9 +72,9 @@ bool nano::confirmation_solicitor::add (nano::election const & election_a)
|
||||||
bool const different (exists && existing->second.hash != hash);
|
bool const different (exists && existing->second.hash != hash);
|
||||||
if (!exists || !is_final || different)
|
if (!exists || !is_final || different)
|
||||||
{
|
{
|
||||||
auto & request_queue (requests[rep.channel]);
|
if (!rep.channel->max (nano::transport::traffic_type::confirmation_requests))
|
||||||
if (!rep.channel->max ())
|
|
||||||
{
|
{
|
||||||
|
auto & request_queue (requests[rep.channel]);
|
||||||
request_queue.emplace_back (election_a.status.winner->hash (), election_a.status.winner->root ());
|
request_queue.emplace_back (election_a.status.winner->hash (), election_a.status.winner->root ());
|
||||||
count += different ? 0 : 1;
|
count += different ? 0 : 1;
|
||||||
error = false;
|
error = false;
|
||||||
|
|
@ -101,14 +102,14 @@ void nano::confirmation_solicitor::flush ()
|
||||||
if (roots_hashes_l.size () == nano::network::confirm_req_hashes_max)
|
if (roots_hashes_l.size () == nano::network::confirm_req_hashes_max)
|
||||||
{
|
{
|
||||||
nano::confirm_req req{ config.network_params.network, roots_hashes_l };
|
nano::confirm_req req{ config.network_params.network, roots_hashes_l };
|
||||||
channel->send (req);
|
channel->send (req, nano::transport::traffic_type::confirmation_requests);
|
||||||
roots_hashes_l.clear ();
|
roots_hashes_l.clear ();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!roots_hashes_l.empty ())
|
if (!roots_hashes_l.empty ())
|
||||||
{
|
{
|
||||||
nano::confirm_req req{ config.network_params.network, roots_hashes_l };
|
nano::confirm_req req{ config.network_params.network, roots_hashes_l };
|
||||||
channel->send (req);
|
channel->send (req, nano::transport::traffic_type::confirmation_requests);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
prepared = false;
|
prepared = false;
|
||||||
|
|
|
||||||
|
|
@ -572,7 +572,7 @@ bool nano::election::publish (std::shared_ptr<nano::block> const & block_a)
|
||||||
if (status.winner->hash () == block_a->hash ())
|
if (status.winner->hash () == block_a->hash ())
|
||||||
{
|
{
|
||||||
status.winner = block_a;
|
status.winner = block_a;
|
||||||
node.network.flood_block (block_a, nano::transport::buffer_drop_policy::no_limiter_drop);
|
node.network.flood_block (block_a, nano::transport::traffic_type::block_broadcast);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3632,7 +3632,7 @@ void nano::json_handler::republish ()
|
||||||
}
|
}
|
||||||
hash = node.ledger.any.block_successor (transaction, hash).value_or (0);
|
hash = node.ledger.any.block_successor (transaction, hash).value_or (0);
|
||||||
}
|
}
|
||||||
node.network.flood_block_many (std::move (republish_bundle), nullptr, 25);
|
node.network.flood_block_many (std::move (republish_bundle), nano::transport::traffic_type::block_broadcast_rpc, 25ms);
|
||||||
response_l.put ("success", ""); // obsolete
|
response_l.put ("success", ""); // obsolete
|
||||||
response_l.add_child ("blocks", blocks);
|
response_l.add_child ("blocks", blocks);
|
||||||
}
|
}
|
||||||
|
|
@ -4867,7 +4867,7 @@ void nano::json_handler::wallet_republish ()
|
||||||
blocks.push_back (std::make_pair ("", entry));
|
blocks.push_back (std::make_pair ("", entry));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
node.network.flood_block_many (std::move (republish_bundle), nullptr, 25);
|
node.network.flood_block_many (std::move (republish_bundle), nano::transport::traffic_type::keepalive, 25ms);
|
||||||
response_l.add_child ("blocks", blocks);
|
response_l.add_child ("blocks", blocks);
|
||||||
}
|
}
|
||||||
response_errors ();
|
response_errors ();
|
||||||
|
|
|
||||||
|
|
@ -233,109 +233,113 @@ void nano::network::run_reachout_cached ()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::network::send_keepalive (std::shared_ptr<nano::transport::channel> const & channel_a)
|
void nano::network::send_keepalive (std::shared_ptr<nano::transport::channel> const & channel) const
|
||||||
{
|
{
|
||||||
nano::keepalive message{ node.network_params.network };
|
nano::keepalive message{ node.network_params.network };
|
||||||
random_fill (message.peers);
|
random_fill (message.peers);
|
||||||
channel_a->send (message);
|
channel->send (message, nano::transport::traffic_type::keepalive);
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::network::send_keepalive_self (std::shared_ptr<nano::transport::channel> const & channel_a)
|
void nano::network::send_keepalive_self (std::shared_ptr<nano::transport::channel> const & channel) const
|
||||||
{
|
{
|
||||||
nano::keepalive message{ node.network_params.network };
|
nano::keepalive message{ node.network_params.network };
|
||||||
fill_keepalive_self (message.peers);
|
fill_keepalive_self (message.peers);
|
||||||
channel_a->send (message);
|
channel->send (message, nano::transport::traffic_type::keepalive);
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::network::flood_message (nano::message & message_a, nano::transport::buffer_drop_policy const drop_policy_a, float const scale_a)
|
void nano::network::flood_message (nano::message const & message, nano::transport::traffic_type type, float scale) const
|
||||||
{
|
{
|
||||||
for (auto & i : list (fanout (scale_a)))
|
for (auto const & channel : list (fanout (scale)))
|
||||||
{
|
{
|
||||||
i->send (message_a, nullptr, drop_policy_a);
|
channel->send (message, type);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::network::flood_keepalive (float const scale_a)
|
void nano::network::flood_keepalive (float scale) const
|
||||||
{
|
{
|
||||||
nano::keepalive message{ node.network_params.network };
|
nano::keepalive message{ node.network_params.network };
|
||||||
random_fill (message.peers);
|
random_fill (message.peers);
|
||||||
flood_message (message, nano::transport::buffer_drop_policy::limiter, scale_a);
|
flood_message (message, nano::transport::traffic_type::keepalive, scale);
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::network::flood_keepalive_self (float const scale_a)
|
void nano::network::flood_keepalive_self (float scale) const
|
||||||
{
|
{
|
||||||
nano::keepalive message{ node.network_params.network };
|
nano::keepalive message{ node.network_params.network };
|
||||||
fill_keepalive_self (message.peers);
|
fill_keepalive_self (message.peers);
|
||||||
flood_message (message, nano::transport::buffer_drop_policy::limiter, scale_a);
|
flood_message (message, nano::transport::traffic_type::keepalive, scale);
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::network::flood_block (std::shared_ptr<nano::block> const & block, nano::transport::buffer_drop_policy const drop_policy)
|
void nano::network::flood_block (std::shared_ptr<nano::block> const & block, nano::transport::traffic_type type) const
|
||||||
{
|
{
|
||||||
nano::publish message{ node.network_params.network, block };
|
nano::publish message{ node.network_params.network, block };
|
||||||
flood_message (message, drop_policy);
|
flood_message (message, type);
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::network::flood_block_initial (std::shared_ptr<nano::block> const & block)
|
void nano::network::flood_block_initial (std::shared_ptr<nano::block> const & block) const
|
||||||
{
|
{
|
||||||
nano::publish message{ node.network_params.network, block, /* is_originator */ true };
|
nano::publish message{ node.network_params.network, block, /* is_originator */ true };
|
||||||
for (auto const & rep : node.rep_crawler.principal_representatives ())
|
for (auto const & rep : node.rep_crawler.principal_representatives ())
|
||||||
{
|
{
|
||||||
rep.channel->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop);
|
rep.channel->send (message, nano::transport::traffic_type::block_broadcast_initial);
|
||||||
}
|
}
|
||||||
for (auto & peer : list_non_pr (fanout (1.0)))
|
for (auto & peer : list_non_pr (fanout (1.0)))
|
||||||
{
|
{
|
||||||
peer->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop);
|
peer->send (message, nano::transport::traffic_type::block_broadcast_initial);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::network::flood_vote (std::shared_ptr<nano::vote> const & vote, float scale, bool rebroadcasted)
|
void nano::network::flood_vote (std::shared_ptr<nano::vote> const & vote, float scale, bool rebroadcasted) const
|
||||||
{
|
{
|
||||||
nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted };
|
nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted };
|
||||||
for (auto & i : list (fanout (scale)))
|
for (auto & channel : list (fanout (scale)))
|
||||||
{
|
{
|
||||||
i->send (message, nullptr);
|
channel->send (message, rebroadcasted ? nano::transport::traffic_type::vote_rebroadcast : nano::transport::traffic_type::vote);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::network::flood_vote_non_pr (std::shared_ptr<nano::vote> const & vote, float scale, bool rebroadcasted)
|
void nano::network::flood_vote_non_pr (std::shared_ptr<nano::vote> const & vote, float scale, bool rebroadcasted) const
|
||||||
{
|
{
|
||||||
nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted };
|
nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted };
|
||||||
for (auto & i : list_non_pr (fanout (scale)))
|
for (auto & channel : list_non_pr (fanout (scale)))
|
||||||
{
|
{
|
||||||
i->send (message, nullptr);
|
channel->send (message, rebroadcasted ? nano::transport::traffic_type::vote_rebroadcast : nano::transport::traffic_type::vote);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::network::flood_vote_pr (std::shared_ptr<nano::vote> const & vote, bool rebroadcasted)
|
void nano::network::flood_vote_pr (std::shared_ptr<nano::vote> const & vote, bool rebroadcasted) const
|
||||||
{
|
{
|
||||||
nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted };
|
nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted };
|
||||||
for (auto const & i : node.rep_crawler.principal_representatives ())
|
for (auto const & channel : node.rep_crawler.principal_representatives ())
|
||||||
{
|
{
|
||||||
i.channel->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop);
|
channel.channel->send (message, rebroadcasted ? nano::transport::traffic_type::vote_rebroadcast : nano::transport::traffic_type::vote);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::network::flood_block_many (std::deque<std::shared_ptr<nano::block>> blocks_a, std::function<void ()> callback_a, unsigned delay_a)
|
void nano::network::flood_block_many (std::deque<std::shared_ptr<nano::block>> blocks, nano::transport::traffic_type type, std::chrono::milliseconds delay, std::function<void ()> callback) const
|
||||||
{
|
{
|
||||||
if (!blocks_a.empty ())
|
if (blocks.empty ())
|
||||||
{
|
{
|
||||||
auto block_l (blocks_a.front ());
|
return;
|
||||||
blocks_a.pop_front ();
|
}
|
||||||
flood_block (block_l);
|
|
||||||
if (!blocks_a.empty ())
|
auto block = blocks.front ();
|
||||||
{
|
blocks.pop_front ();
|
||||||
std::weak_ptr<nano::node> node_w (node.shared ());
|
|
||||||
node.workers.post_delayed (std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks (std::move (blocks_a)), callback_a, delay_a] () {
|
flood_block (block, type);
|
||||||
if (auto node_l = node_w.lock ())
|
|
||||||
{
|
if (!blocks.empty ())
|
||||||
node_l->network.flood_block_many (std::move (blocks), callback_a, delay_a);
|
{
|
||||||
}
|
std::weak_ptr<nano::node> node_w (node.shared ());
|
||||||
});
|
node.workers.post_delayed (delay, [node_w, type, blocks = std::move (blocks), delay, callback] () mutable {
|
||||||
}
|
if (auto node_l = node_w.lock ())
|
||||||
else if (callback_a)
|
{
|
||||||
{
|
node_l->network.flood_block_many (std::move (blocks), type, delay, callback);
|
||||||
callback_a ();
|
}
|
||||||
}
|
});
|
||||||
|
}
|
||||||
|
else if (callback)
|
||||||
|
{
|
||||||
|
callback ();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -365,7 +369,7 @@ bool nano::network::merge_peer (nano::endpoint const & peer)
|
||||||
return false; // Not initiated
|
return false; // Not initiated
|
||||||
}
|
}
|
||||||
|
|
||||||
bool nano::network::not_a_peer (nano::endpoint const & endpoint_a, bool allow_local_peers)
|
bool nano::network::not_a_peer (nano::endpoint const & endpoint_a, bool allow_local_peers) const
|
||||||
{
|
{
|
||||||
bool result (false);
|
bool result (false);
|
||||||
if (endpoint_a.address ().to_v6 ().is_unspecified ())
|
if (endpoint_a.address ().to_v6 ().is_unspecified ())
|
||||||
|
|
@ -393,32 +397,32 @@ bool nano::network::track_reachout (nano::endpoint const & endpoint_a)
|
||||||
return tcp_channels.track_reachout (endpoint_a);
|
return tcp_channels.track_reachout (endpoint_a);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (std::size_t count_a, uint8_t minimum_version_a, bool include_tcp_temporary_channels_a)
|
std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (std::size_t max_count, uint8_t minimum_version) const
|
||||||
{
|
{
|
||||||
std::deque<std::shared_ptr<nano::transport::channel>> result;
|
auto result = tcp_channels.list (minimum_version);
|
||||||
tcp_channels.list (result, minimum_version_a, include_tcp_temporary_channels_a);
|
nano::random_pool_shuffle (result.begin (), result.end ()); // Randomize returned peer order
|
||||||
nano::random_pool_shuffle (result.begin (), result.end ());
|
if (max_count > 0 && result.size () > max_count)
|
||||||
if (count_a > 0 && result.size () > count_a)
|
|
||||||
{
|
{
|
||||||
result.resize (count_a, nullptr);
|
result.resize (max_count, nullptr);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list_non_pr (std::size_t count_a)
|
std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list_non_pr (std::size_t max_count, uint8_t minimum_version) const
|
||||||
{
|
{
|
||||||
std::deque<std::shared_ptr<nano::transport::channel>> result;
|
auto result = tcp_channels.list (minimum_version);
|
||||||
tcp_channels.list (result);
|
|
||||||
|
|
||||||
auto partition_point = std::partition (result.begin (), result.end (),
|
auto partition_point = std::partition (result.begin (), result.end (),
|
||||||
[this] (std::shared_ptr<nano::transport::channel> const & channel) {
|
[this] (std::shared_ptr<nano::transport::channel> const & channel) {
|
||||||
return !node.rep_crawler.is_pr (channel);
|
return !node.rep_crawler.is_pr (channel);
|
||||||
});
|
});
|
||||||
result.resize (std::distance (result.begin (), partition_point));
|
result.resize (std::distance (result.begin (), partition_point));
|
||||||
nano::random_pool_shuffle (result.begin (), result.end ());
|
|
||||||
if (result.size () > count_a)
|
nano::random_pool_shuffle (result.begin (), result.end ()); // Randomize returned peer order
|
||||||
|
|
||||||
|
if (result.size () > max_count)
|
||||||
{
|
{
|
||||||
result.resize (count_a, nullptr);
|
result.resize (max_count, nullptr);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
@ -429,14 +433,14 @@ std::size_t nano::network::fanout (float scale) const
|
||||||
return static_cast<std::size_t> (std::ceil (scale * size_sqrt ()));
|
return static_cast<std::size_t> (std::ceil (scale * size_sqrt ()));
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::network::random_set (std::size_t count_a, uint8_t min_version_a, bool include_temporary_channels_a) const
|
std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::network::random_set (std::size_t max_count, uint8_t minimum_version) const
|
||||||
{
|
{
|
||||||
return tcp_channels.random_set (count_a, min_version_a, include_temporary_channels_a);
|
return tcp_channels.random_set (max_count, minimum_version);
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::network::random_fill (std::array<nano::endpoint, 8> & target_a) const
|
void nano::network::random_fill (std::array<nano::endpoint, 8> & target_a) const
|
||||||
{
|
{
|
||||||
auto peers (random_set (target_a.size (), 0, false)); // Don't include channels with ephemeral remote ports
|
auto peers (random_set (target_a.size (), 0));
|
||||||
debug_assert (peers.size () <= target_a.size ());
|
debug_assert (peers.size () <= target_a.size ());
|
||||||
auto endpoint (nano::endpoint (boost::asio::ip::address_v6{}, 0));
|
auto endpoint (nano::endpoint (boost::asio::ip::address_v6{}, 0));
|
||||||
debug_assert (endpoint.address ().is_v6 ());
|
debug_assert (endpoint.address ().is_v6 ());
|
||||||
|
|
|
||||||
|
|
@ -90,37 +90,48 @@ public:
|
||||||
void start ();
|
void start ();
|
||||||
void stop ();
|
void stop ();
|
||||||
|
|
||||||
void flood_message (nano::message &, nano::transport::buffer_drop_policy const = nano::transport::buffer_drop_policy::limiter, float const = 1.0f);
|
nano::endpoint endpoint () const;
|
||||||
void flood_keepalive (float const scale_a = 1.0f);
|
|
||||||
void flood_keepalive_self (float const scale_a = 0.5f);
|
void flood_message (nano::message const &, nano::transport::traffic_type, float scale = 1.0f) const;
|
||||||
void flood_vote (std::shared_ptr<nano::vote> const &, float scale, bool rebroadcasted = false);
|
void flood_keepalive (float scale = 1.0f) const;
|
||||||
void flood_vote_pr (std::shared_ptr<nano::vote> const &, bool rebroadcasted = false);
|
void flood_keepalive_self (float scale = 0.5f) const;
|
||||||
void flood_vote_non_pr (std::shared_ptr<nano::vote> const &, float scale, bool rebroadcasted = false);
|
void flood_vote (std::shared_ptr<nano::vote> const &, float scale, bool rebroadcasted = false) const;
|
||||||
|
void flood_vote_pr (std::shared_ptr<nano::vote> const &, bool rebroadcasted = false) const;
|
||||||
|
void flood_vote_non_pr (std::shared_ptr<nano::vote> const &, float scale, bool rebroadcasted = false) const;
|
||||||
// Flood block to all PRs and a random selection of non-PRs
|
// Flood block to all PRs and a random selection of non-PRs
|
||||||
void flood_block_initial (std::shared_ptr<nano::block> const &);
|
void flood_block_initial (std::shared_ptr<nano::block> const &) const;
|
||||||
// Flood block to a random selection of peers
|
// Flood block to a random selection of peers
|
||||||
void flood_block (std::shared_ptr<nano::block> const &, nano::transport::buffer_drop_policy const = nano::transport::buffer_drop_policy::limiter);
|
void flood_block (std::shared_ptr<nano::block> const &, nano::transport::traffic_type) const;
|
||||||
void flood_block_many (std::deque<std::shared_ptr<nano::block>>, std::function<void ()> = nullptr, unsigned = broadcast_interval_ms);
|
void flood_block_many (std::deque<std::shared_ptr<nano::block>>, nano::transport::traffic_type, std::chrono::milliseconds delay = 10ms, std::function<void ()> callback = nullptr) const;
|
||||||
|
|
||||||
|
void send_keepalive (std::shared_ptr<nano::transport::channel> const &) const;
|
||||||
|
void send_keepalive_self (std::shared_ptr<nano::transport::channel> const &) const;
|
||||||
|
|
||||||
void merge_peers (std::array<nano::endpoint, 8> const & ips);
|
void merge_peers (std::array<nano::endpoint, 8> const & ips);
|
||||||
bool merge_peer (nano::endpoint const & ip);
|
bool merge_peer (nano::endpoint const & ip);
|
||||||
void send_keepalive (std::shared_ptr<nano::transport::channel> const &);
|
|
||||||
void send_keepalive_self (std::shared_ptr<nano::transport::channel> const &);
|
|
||||||
std::shared_ptr<nano::transport::channel> find_node_id (nano::account const &);
|
std::shared_ptr<nano::transport::channel> find_node_id (nano::account const &);
|
||||||
std::shared_ptr<nano::transport::channel> find_channel (nano::endpoint const &);
|
std::shared_ptr<nano::transport::channel> find_channel (nano::endpoint const &);
|
||||||
bool not_a_peer (nano::endpoint const &, bool allow_local_peers);
|
|
||||||
|
// Check if the endpoint address looks OK
|
||||||
|
bool not_a_peer (nano::endpoint const &, bool allow_local_peers) const;
|
||||||
// Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt
|
// Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt
|
||||||
bool track_reachout (nano::endpoint const &);
|
bool track_reachout (nano::endpoint const &);
|
||||||
std::deque<std::shared_ptr<nano::transport::channel>> list (std::size_t max_count = 0, uint8_t = 0, bool = true);
|
|
||||||
std::deque<std::shared_ptr<nano::transport::channel>> list_non_pr (std::size_t);
|
std::deque<std::shared_ptr<nano::transport::channel>> list (std::size_t max_count = 0, uint8_t minimum_version = 0) const;
|
||||||
|
std::deque<std::shared_ptr<nano::transport::channel>> list_non_pr (std::size_t max_count, uint8_t minimum_version = 0) const;
|
||||||
|
|
||||||
// Desired fanout for a given scale
|
// Desired fanout for a given scale
|
||||||
std::size_t fanout (float scale = 1.0f) const;
|
std::size_t fanout (float scale = 1.0f) const;
|
||||||
|
|
||||||
void random_fill (std::array<nano::endpoint, 8> &) const;
|
void random_fill (std::array<nano::endpoint, 8> &) const;
|
||||||
void fill_keepalive_self (std::array<nano::endpoint, 8> &) const;
|
void fill_keepalive_self (std::array<nano::endpoint, 8> &) const;
|
||||||
|
|
||||||
// Note: The minimum protocol version is used after the random selection, so number of peers can be less than expected.
|
// Note: The minimum protocol version is used after the random selection, so number of peers can be less than expected.
|
||||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (std::size_t count, uint8_t min_version = 0, bool include_temporary_channels = false) const;
|
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (std::size_t max_count, uint8_t minimum_version = 0) const;
|
||||||
|
|
||||||
// Get the next peer for attempting a tcp bootstrap connection
|
// Get the next peer for attempting a tcp bootstrap connection
|
||||||
nano::tcp_endpoint bootstrap_peer ();
|
nano::tcp_endpoint bootstrap_peer ();
|
||||||
nano::endpoint endpoint () const;
|
|
||||||
void cleanup (std::chrono::steady_clock::time_point const & cutoff);
|
void cleanup (std::chrono::steady_clock::time_point const & cutoff);
|
||||||
std::size_t size () const;
|
std::size_t size () const;
|
||||||
float size_sqrt () const;
|
float size_sqrt () const;
|
||||||
|
|
@ -169,7 +180,6 @@ private:
|
||||||
std::thread reachout_cached_thread;
|
std::thread reachout_cached_thread;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
static unsigned const broadcast_interval_ms = 10;
|
|
||||||
static std::size_t const buffer_size = 512;
|
static std::size_t const buffer_size = 512;
|
||||||
|
|
||||||
static std::size_t confirm_req_hashes_max;
|
static std::size_t confirm_req_hashes_max;
|
||||||
|
|
|
||||||
|
|
@ -260,7 +260,7 @@ std::deque<std::shared_ptr<nano::transport::channel>> nano::rep_crawler::prepare
|
||||||
// Crawl more aggressively if we lack sufficient total peer weight.
|
// Crawl more aggressively if we lack sufficient total peer weight.
|
||||||
auto const required_peer_count = sufficient_weight ? conservative_count : aggressive_count;
|
auto const required_peer_count = sufficient_weight ? conservative_count : aggressive_count;
|
||||||
|
|
||||||
auto random_peers = node.network.random_set (required_peer_count, 0, /* include channels with ephemeral remote ports */ true);
|
auto random_peers = node.network.random_set (required_peer_count);
|
||||||
|
|
||||||
auto should_query = [&, this] (std::shared_ptr<nano::transport::channel> const & channel) {
|
auto should_query = [&, this] (std::shared_ptr<nano::transport::channel> const & channel) {
|
||||||
if (auto rep = reps.get<tag_channel> ().find (channel); rep != reps.get<tag_channel> ().end ())
|
if (auto rep = reps.get<tag_channel> ().find (channel); rep != reps.get<tag_channel> ().end ())
|
||||||
|
|
@ -339,8 +339,6 @@ void nano::rep_crawler::query (std::deque<std::shared_ptr<nano::transport::chann
|
||||||
|
|
||||||
for (const auto & channel : target_channels)
|
for (const auto & channel : target_channels)
|
||||||
{
|
{
|
||||||
debug_assert (channel != nullptr);
|
|
||||||
|
|
||||||
bool tracked = track_rep_request (hash_root, channel);
|
bool tracked = track_rep_request (hash_root, channel);
|
||||||
if (tracked)
|
if (tracked)
|
||||||
{
|
{
|
||||||
|
|
@ -350,15 +348,9 @@ void nano::rep_crawler::query (std::deque<std::shared_ptr<nano::transport::chann
|
||||||
auto const & [hash, root] = hash_root;
|
auto const & [hash, root] = hash_root;
|
||||||
nano::confirm_req req{ network_constants, hash, root };
|
nano::confirm_req req{ network_constants, hash, root };
|
||||||
|
|
||||||
channel->send (
|
channel->send (req, nano::transport::traffic_type::rep_crawler, [this] (auto & ec, auto size) {
|
||||||
req,
|
stats.inc (nano::stat::type::rep_crawler_ec, to_stat_detail (ec), nano::stat::dir::out);
|
||||||
[this] (auto & ec, auto size) {
|
});
|
||||||
if (ec)
|
|
||||||
{
|
|
||||||
stats.inc (nano::stat::type::rep_crawler, nano::stat::detail::write_error, nano::stat::dir::out);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
nano::transport::buffer_drop_policy::no_socket_drop);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -26,13 +26,6 @@ nano::request_aggregator::request_aggregator (request_aggregator_config const &
|
||||||
generator (generator_a),
|
generator (generator_a),
|
||||||
final_generator (final_generator_a)
|
final_generator (final_generator_a)
|
||||||
{
|
{
|
||||||
generator.set_reply_action ([this] (std::shared_ptr<nano::vote> const & vote_a, std::shared_ptr<nano::transport::channel> const & channel_a) {
|
|
||||||
this->reply_action (vote_a, channel_a);
|
|
||||||
});
|
|
||||||
final_generator.set_reply_action ([this] (std::shared_ptr<nano::vote> const & vote_a, std::shared_ptr<nano::transport::channel> const & channel_a) {
|
|
||||||
this->reply_action (vote_a, channel_a);
|
|
||||||
});
|
|
||||||
|
|
||||||
queue.max_size_query = [this] (auto const & origin) {
|
queue.max_size_query = [this] (auto const & origin) {
|
||||||
return config.max_queue;
|
return config.max_queue;
|
||||||
};
|
};
|
||||||
|
|
@ -159,7 +152,7 @@ void nano::request_aggregator::run_batch (nano::unique_lock<nano::mutex> & lock)
|
||||||
|
|
||||||
transaction.refresh_if_needed ();
|
transaction.refresh_if_needed ();
|
||||||
|
|
||||||
if (!channel->max ())
|
if (!channel->max (nano::transport::traffic_type::vote_reply))
|
||||||
{
|
{
|
||||||
process (transaction, request, channel);
|
process (transaction, request, channel);
|
||||||
}
|
}
|
||||||
|
|
@ -192,12 +185,6 @@ void nano::request_aggregator::process (nano::secure::transaction const & transa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::request_aggregator::reply_action (std::shared_ptr<nano::vote> const & vote_a, std::shared_ptr<nano::transport::channel> const & channel_a) const
|
|
||||||
{
|
|
||||||
nano::confirm_ack confirm{ network_constants, vote_a };
|
|
||||||
channel_a->send (confirm);
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::request_aggregator::erase_duplicates (std::vector<std::pair<nano::block_hash, nano::root>> & requests_a) const
|
void nano::request_aggregator::erase_duplicates (std::vector<std::pair<nano::block_hash, nano::root>> & requests_a) const
|
||||||
{
|
{
|
||||||
std::sort (requests_a.begin (), requests_a.end (), [] (auto const & pair1, auto const & pair2) {
|
std::sort (requests_a.begin (), requests_a.end (), [] (auto const & pair1, auto const & pair2) {
|
||||||
|
|
|
||||||
|
|
@ -214,7 +214,7 @@ void nano::telemetry::request (std::shared_ptr<nano::transport::channel> const &
|
||||||
stats.inc (nano::stat::type::telemetry, nano::stat::detail::request);
|
stats.inc (nano::stat::type::telemetry, nano::stat::detail::request);
|
||||||
|
|
||||||
nano::telemetry_req message{ network_params.network };
|
nano::telemetry_req message{ network_params.network };
|
||||||
channel->send (message);
|
channel->send (message, nano::transport::traffic_type::telemetry);
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::telemetry::run_broadcasts ()
|
void nano::telemetry::run_broadcasts ()
|
||||||
|
|
@ -233,7 +233,7 @@ void nano::telemetry::broadcast (std::shared_ptr<nano::transport::channel> const
|
||||||
stats.inc (nano::stat::type::telemetry, nano::stat::detail::broadcast);
|
stats.inc (nano::stat::type::telemetry, nano::stat::detail::broadcast);
|
||||||
|
|
||||||
nano::telemetry_ack message{ network_params.network, telemetry };
|
nano::telemetry_ack message{ network_params.network, telemetry };
|
||||||
channel->send (message);
|
channel->send (message, nano::transport::traffic_type::telemetry);
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::telemetry::cleanup ()
|
void nano::telemetry::cleanup ()
|
||||||
|
|
|
||||||
|
|
@ -14,10 +14,10 @@ nano::transport::channel::channel (nano::node & node_a) :
|
||||||
set_network_version (node_a.network_params.network.protocol_version);
|
set_network_version (node_a.network_params.network.protocol_version);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool nano::transport::channel::send (nano::message const & message, std::function<void (boost::system::error_code const &, std::size_t)> const & callback, nano::transport::buffer_drop_policy drop_policy, nano::transport::traffic_type traffic_type)
|
bool nano::transport::channel::send (nano::message const & message, nano::transport::traffic_type traffic_type, callback_t callback)
|
||||||
{
|
{
|
||||||
auto buffer = message.to_shared_const_buffer ();
|
auto buffer = message.to_shared_const_buffer ();
|
||||||
bool sent = send_buffer (buffer, callback, drop_policy, traffic_type);
|
bool sent = send_buffer (buffer, traffic_type, std::move (callback));
|
||||||
node.stats.inc (sent ? nano::stat::type::message : nano::stat::type::drop, to_stat_detail (message.type ()), nano::stat::dir::out, /* aggregate all */ true);
|
node.stats.inc (sent ? nano::stat::type::message : nano::stat::type::drop, to_stat_detail (message.type ()), nano::stat::dir::out, /* aggregate all */ true);
|
||||||
return sent;
|
return sent;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -30,10 +30,7 @@ public:
|
||||||
virtual ~channel () = default;
|
virtual ~channel () = default;
|
||||||
|
|
||||||
/// @returns true if the message was sent (or queued to be sent), false if it was immediately dropped
|
/// @returns true if the message was sent (or queued to be sent), false if it was immediately dropped
|
||||||
bool send (nano::message const &,
|
bool send (nano::message const &, nano::transport::traffic_type, callback_t = nullptr);
|
||||||
callback_t const & callback = nullptr,
|
|
||||||
nano::transport::buffer_drop_policy policy = nano::transport::buffer_drop_policy::limiter,
|
|
||||||
nano::transport::traffic_type = nano::transport::traffic_type::generic);
|
|
||||||
|
|
||||||
virtual void close () = 0;
|
virtual void close () = 0;
|
||||||
|
|
||||||
|
|
@ -43,7 +40,7 @@ public:
|
||||||
virtual std::string to_string () const = 0;
|
virtual std::string to_string () const = 0;
|
||||||
virtual nano::transport::transport_type get_type () const = 0;
|
virtual nano::transport::transport_type get_type () const = 0;
|
||||||
|
|
||||||
virtual bool max (nano::transport::traffic_type = nano::transport::traffic_type::generic)
|
virtual bool max (nano::transport::traffic_type)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
@ -123,11 +120,7 @@ public:
|
||||||
std::shared_ptr<nano::node> owner () const;
|
std::shared_ptr<nano::node> owner () const;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual bool send_buffer (nano::shared_const_buffer const &,
|
virtual bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, callback_t) = 0;
|
||||||
callback_t const & callback = nullptr,
|
|
||||||
nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter,
|
|
||||||
nano::transport::traffic_type = nano::transport::traffic_type::generic)
|
|
||||||
= 0;
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
nano::node & node;
|
nano::node & node;
|
||||||
|
|
|
||||||
|
|
@ -14,14 +14,13 @@ nano::transport::fake::channel::channel (nano::node & node) :
|
||||||
/**
|
/**
|
||||||
* The send function behaves like a null device, it throws the data away and returns success.
|
* The send function behaves like a null device, it throws the data away and returns success.
|
||||||
*/
|
*/
|
||||||
bool nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type)
|
bool nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer, nano::transport::traffic_type traffic_type, nano::transport::channel::callback_t callback)
|
||||||
{
|
{
|
||||||
// auto bytes = buffer_a.to_bytes ();
|
auto size = buffer.size ();
|
||||||
auto size = buffer_a.size ();
|
if (callback)
|
||||||
if (callback_a)
|
|
||||||
{
|
{
|
||||||
node.io_ctx.post ([callback_a, size] () {
|
node.io_ctx.post ([callback, size] () {
|
||||||
callback_a (boost::system::errc::make_error_code (boost::system::errc::success), size);
|
callback (boost::system::errc::make_error_code (boost::system::errc::success), size);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
|
|
||||||
|
|
@ -50,11 +50,7 @@ namespace transport
|
||||||
}
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
bool send_buffer (
|
bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override;
|
||||||
nano::shared_const_buffer const &,
|
|
||||||
std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr,
|
|
||||||
nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter,
|
|
||||||
nano::transport::traffic_type = nano::transport::traffic_type::generic) override;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
nano::endpoint endpoint;
|
nano::endpoint endpoint;
|
||||||
|
|
|
||||||
|
|
@ -18,10 +18,10 @@ nano::transport::inproc::channel::channel (nano::node & node, nano::node & desti
|
||||||
* Send the buffer to the peer and call the callback function when done. The call never fails.
|
* Send the buffer to the peer and call the callback function when done. The call never fails.
|
||||||
* Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background.
|
* Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background.
|
||||||
*/
|
*/
|
||||||
bool nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type)
|
bool nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer, nano::transport::traffic_type traffic_type, nano::transport::channel::callback_t callback)
|
||||||
{
|
{
|
||||||
std::size_t offset{ 0 };
|
std::size_t offset{ 0 };
|
||||||
auto const buffer_read_fn = [&offset, buffer_v = buffer_a.to_bytes ()] (std::shared_ptr<std::vector<uint8_t>> const & data_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a) {
|
auto const buffer_read_fn = [&offset, buffer_v = buffer.to_bytes ()] (std::shared_ptr<std::vector<uint8_t>> const & data_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a) {
|
||||||
debug_assert (buffer_v.size () >= (offset + size_a));
|
debug_assert (buffer_v.size () >= (offset + size_a));
|
||||||
data_a->resize (size_a);
|
data_a->resize (size_a);
|
||||||
auto const copy_start = buffer_v.begin () + offset;
|
auto const copy_start = buffer_v.begin () + offset;
|
||||||
|
|
@ -48,9 +48,9 @@ bool nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer co
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
if (callback_a)
|
if (callback)
|
||||||
{
|
{
|
||||||
node.io_ctx.post ([callback_l = std::move (callback_a), buffer_size = buffer_a.size ()] () {
|
node.io_ctx.post ([callback_l = std::move (callback), buffer_size = buffer.size ()] () {
|
||||||
callback_l (boost::system::errc::make_error_code (boost::system::errc::success), buffer_size);
|
callback_l (boost::system::errc::make_error_code (boost::system::errc::success), buffer_size);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,7 @@ namespace transport
|
||||||
}
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
bool send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) override;
|
bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
nano::node & destination;
|
nano::node & destination;
|
||||||
|
|
|
||||||
|
|
@ -75,62 +75,24 @@ bool nano::transport::tcp_channel::max (nano::transport::traffic_type traffic_ty
|
||||||
return queue.max (traffic_type);
|
return queue.max (traffic_type);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool nano::transport::tcp_channel::send_buffer (nano::shared_const_buffer const & buffer, std::function<void (boost::system::error_code const &, std::size_t)> const & callback, nano::transport::buffer_drop_policy policy, nano::transport::traffic_type traffic_type)
|
bool nano::transport::tcp_channel::send_buffer (nano::shared_const_buffer const & buffer, nano::transport::traffic_type type, nano::transport::channel::callback_t callback)
|
||||||
{
|
{
|
||||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||||
if (!queue.max (traffic_type) || (policy == buffer_drop_policy::no_socket_drop && !queue.full (traffic_type)))
|
if (!queue.full (type))
|
||||||
{
|
{
|
||||||
queue.push (traffic_type, { buffer, callback });
|
queue.push (type, { buffer, callback });
|
||||||
lock.unlock ();
|
lock.unlock ();
|
||||||
node.stats.inc (nano::stat::type::tcp_channel, nano::stat::detail::queued, nano::stat::dir::out);
|
node.stats.inc (nano::stat::type::tcp_channel, nano::stat::detail::queued, nano::stat::dir::out);
|
||||||
node.stats.inc (nano::stat::type::tcp_channel_queued, to_stat_detail (traffic_type), nano::stat::dir::out);
|
node.stats.inc (nano::stat::type::tcp_channel_queued, to_stat_detail (type), nano::stat::dir::out);
|
||||||
sending_task.notify ();
|
sending_task.notify ();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
node.stats.inc (nano::stat::type::tcp_channel, nano::stat::detail::drop, nano::stat::dir::out);
|
node.stats.inc (nano::stat::type::tcp_channel, nano::stat::detail::drop, nano::stat::dir::out);
|
||||||
node.stats.inc (nano::stat::type::tcp_channel_drop, to_stat_detail (traffic_type), nano::stat::dir::out);
|
node.stats.inc (nano::stat::type::tcp_channel_drop, to_stat_detail (type), nano::stat::dir::out);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
// if (!socket->max (traffic_type) || (policy_a == nano::transport::buffer_drop_policy::no_socket_drop && !socket->full (traffic_type)))
|
|
||||||
// {
|
|
||||||
// socket->async_write (
|
|
||||||
// buffer_a, [this_s = shared_from_this (), endpoint_a = socket->remote_endpoint (), node = std::weak_ptr<nano::node>{ node.shared () }, callback_a] (boost::system::error_code const & ec, std::size_t size_a) {
|
|
||||||
// if (auto node_l = node.lock ())
|
|
||||||
// {
|
|
||||||
// if (!ec)
|
|
||||||
// {
|
|
||||||
// this_s->set_last_packet_sent (std::chrono::steady_clock::now ());
|
|
||||||
// }
|
|
||||||
// if (ec == boost::system::errc::host_unreachable)
|
|
||||||
// {
|
|
||||||
// node_l->stats.inc (nano::stat::type::error, nano::stat::detail::unreachable_host, nano::stat::dir::out);
|
|
||||||
// }
|
|
||||||
// if (callback_a)
|
|
||||||
// {
|
|
||||||
// callback_a (ec, size_a);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// },
|
|
||||||
// traffic_type);
|
|
||||||
// }
|
|
||||||
// else
|
|
||||||
// {
|
|
||||||
// if (policy_a == nano::transport::buffer_drop_policy::no_socket_drop)
|
|
||||||
// {
|
|
||||||
// node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out);
|
|
||||||
// }
|
|
||||||
// else
|
|
||||||
// {
|
|
||||||
// node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out);
|
|
||||||
// }
|
|
||||||
// if (callback_a)
|
|
||||||
// {
|
|
||||||
// callback_a (boost::system::errc::make_error_code (boost::system::errc::no_buffer_space), 0);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
asio::awaitable<void> nano::transport::tcp_channel::run_sending (nano::async::condition & condition)
|
asio::awaitable<void> nano::transport::tcp_channel::run_sending (nano::async::condition & condition)
|
||||||
|
|
@ -227,10 +189,10 @@ asio::awaitable<void> nano::transport::tcp_channel::wait_socket (nano::transport
|
||||||
{
|
{
|
||||||
debug_assert (strand.running_in_this_thread ());
|
debug_assert (strand.running_in_this_thread ());
|
||||||
|
|
||||||
auto should_wait = [this, type] () {
|
auto should_wait = [this] () {
|
||||||
if (auto socket_l = socket.lock ())
|
if (auto socket_l = socket.lock ())
|
||||||
{
|
{
|
||||||
return socket_l->full (type);
|
return socket_l->full ();
|
||||||
}
|
}
|
||||||
return false; // Abort if the socket is dead
|
return false; // Abort if the socket is dead
|
||||||
};
|
};
|
||||||
|
|
@ -373,14 +335,6 @@ auto nano::transport::tcp_channel_queue::next_batch (size_t max_count) -> batch_
|
||||||
|
|
||||||
size_t nano::transport::tcp_channel_queue::priority (traffic_type type) const
|
size_t nano::transport::tcp_channel_queue::priority (traffic_type type) const
|
||||||
{
|
{
|
||||||
switch (type)
|
|
||||||
{
|
|
||||||
case traffic_type::generic:
|
|
||||||
return 1;
|
|
||||||
case traffic_type::bootstrap:
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
debug_assert (false);
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -64,11 +64,7 @@ public:
|
||||||
std::string to_string () const override;
|
std::string to_string () const override;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
bool send_buffer (nano::shared_const_buffer const &,
|
bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override;
|
||||||
nano::transport::channel::callback_t const & callback = nullptr,
|
|
||||||
nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter,
|
|
||||||
nano::transport::traffic_type = nano::transport::traffic_type::generic)
|
|
||||||
override;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void start ();
|
void start ();
|
||||||
|
|
|
||||||
|
|
@ -151,7 +151,7 @@ std::shared_ptr<nano::transport::tcp_channel> nano::transport::tcp_channels::fin
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::transport::tcp_channels::random_set (std::size_t count_a, uint8_t min_version, bool include_temporary_channels_a) const
|
std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::transport::tcp_channels::random_set (std::size_t count_a, uint8_t min_version) const
|
||||||
{
|
{
|
||||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> result;
|
std::unordered_set<std::shared_ptr<nano::transport::channel>> result;
|
||||||
result.reserve (count_a);
|
result.reserve (count_a);
|
||||||
|
|
@ -378,7 +378,7 @@ void nano::transport::tcp_channels::keepalive ()
|
||||||
|
|
||||||
for (auto & channel : to_wakeup)
|
for (auto & channel : to_wakeup)
|
||||||
{
|
{
|
||||||
channel->send (message);
|
channel->send (message, nano::transport::traffic_type::keepalive);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -402,14 +402,19 @@ std::optional<nano::keepalive> nano::transport::tcp_channels::sample_keepalive (
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::transport::tcp_channels::list (std::deque<std::shared_ptr<nano::transport::channel>> & deque_a, uint8_t minimum_version_a, bool include_temporary_channels_a)
|
std::deque<std::shared_ptr<nano::transport::channel>> nano::transport::tcp_channels::list (uint8_t minimum_version) const
|
||||||
{
|
{
|
||||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||||
// clang-format off
|
|
||||||
nano::transform_if (channels.get<random_access_tag> ().begin (), channels.get<random_access_tag> ().end (), std::back_inserter (deque_a),
|
std::deque<std::shared_ptr<nano::transport::channel>> result;
|
||||||
[include_temporary_channels_a, minimum_version_a](auto & channel_a) { return channel_a.channel->get_network_version () >= minimum_version_a; },
|
for (auto const & entry : channels)
|
||||||
[](auto const & channel) { return channel.channel; });
|
{
|
||||||
// clang-format on
|
if (entry.channel->get_network_version () >= minimum_version)
|
||||||
|
{
|
||||||
|
result.push_back (entry.channel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint)
|
bool nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint)
|
||||||
|
|
|
||||||
|
|
@ -41,17 +41,17 @@ public:
|
||||||
std::size_t size () const;
|
std::size_t size () const;
|
||||||
std::shared_ptr<nano::transport::tcp_channel> find_channel (nano::tcp_endpoint const &) const;
|
std::shared_ptr<nano::transport::tcp_channel> find_channel (nano::tcp_endpoint const &) const;
|
||||||
void random_fill (std::array<nano::endpoint, 8> &) const;
|
void random_fill (std::array<nano::endpoint, 8> &) const;
|
||||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (std::size_t, uint8_t = 0, bool = false) const;
|
|
||||||
std::shared_ptr<nano::transport::tcp_channel> find_node_id (nano::account const &);
|
std::shared_ptr<nano::transport::tcp_channel> find_node_id (nano::account const &);
|
||||||
// Get the next peer for attempting a tcp connection
|
// Get the next peer for attempting a tcp connection
|
||||||
nano::tcp_endpoint bootstrap_peer ();
|
nano::tcp_endpoint bootstrap_peer ();
|
||||||
bool max_ip_connections (nano::tcp_endpoint const & endpoint_a);
|
bool max_ip_connections (nano::tcp_endpoint const & endpoint);
|
||||||
bool max_subnetwork_connections (nano::tcp_endpoint const & endpoint_a);
|
bool max_subnetwork_connections (nano::tcp_endpoint const & endpoint);
|
||||||
bool max_ip_or_subnetwork_connections (nano::tcp_endpoint const & endpoint_a);
|
bool max_ip_or_subnetwork_connections (nano::tcp_endpoint const & endpoint);
|
||||||
// Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt
|
// Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt
|
||||||
bool track_reachout (nano::endpoint const &);
|
bool track_reachout (nano::endpoint const &);
|
||||||
void purge (std::chrono::steady_clock::time_point cutoff_deadline);
|
void purge (std::chrono::steady_clock::time_point cutoff_deadline);
|
||||||
void list (std::deque<std::shared_ptr<nano::transport::channel>> &, uint8_t = 0, bool = true);
|
std::deque<std::shared_ptr<nano::transport::channel>> list (uint8_t minimum_version = 0) const;
|
||||||
|
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (std::size_t max_count, uint8_t minimum_version = 0) const;
|
||||||
void keepalive ();
|
void keepalive ();
|
||||||
std::optional<nano::keepalive> sample_keepalive ();
|
std::optional<nano::keepalive> sample_keepalive ();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -140,7 +140,7 @@ void nano::transport::tcp_socket::async_read (std::shared_ptr<std::vector<uint8_
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::transport::tcp_socket::async_write (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a, nano::transport::traffic_type traffic_type)
|
void nano::transport::tcp_socket::async_write (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a)
|
||||||
{
|
{
|
||||||
auto node_l = node_w.lock ();
|
auto node_l = node_w.lock ();
|
||||||
if (!node_l)
|
if (!node_l)
|
||||||
|
|
@ -159,7 +159,7 @@ void nano::transport::tcp_socket::async_write (nano::shared_const_buffer const &
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool queued = send_queue.insert (buffer_a, callback_a, traffic_type);
|
bool queued = send_queue.insert (buffer_a, callback_a, traffic_type::generic);
|
||||||
if (!queued)
|
if (!queued)
|
||||||
{
|
{
|
||||||
if (callback_a)
|
if (callback_a)
|
||||||
|
|
@ -234,14 +234,14 @@ void nano::transport::tcp_socket::write_queued_messages ()
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
bool nano::transport::tcp_socket::max (nano::transport::traffic_type traffic_type) const
|
bool nano::transport::tcp_socket::max () const
|
||||||
{
|
{
|
||||||
return send_queue.size (traffic_type) >= max_queue_size;
|
return send_queue.size (traffic_type::generic) >= max_queue_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool nano::transport::tcp_socket::full (nano::transport::traffic_type traffic_type) const
|
bool nano::transport::tcp_socket::full () const
|
||||||
{
|
{
|
||||||
return send_queue.size (traffic_type) >= 2 * max_queue_size;
|
return send_queue.size (traffic_type::generic) >= 2 * max_queue_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Call set_timeout with default_timeout as parameter */
|
/** Call set_timeout with default_timeout as parameter */
|
||||||
|
|
@ -468,10 +468,6 @@ auto nano::transport::socket_queue::pop () -> std::optional<result_t>
|
||||||
{
|
{
|
||||||
return item;
|
return item;
|
||||||
}
|
}
|
||||||
if (auto item = try_pop (nano::transport::traffic_type::bootstrap))
|
|
||||||
{
|
|
||||||
return item;
|
|
||||||
}
|
|
||||||
|
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -97,8 +97,7 @@ public:
|
||||||
|
|
||||||
void async_write (
|
void async_write (
|
||||||
nano::shared_const_buffer const &,
|
nano::shared_const_buffer const &,
|
||||||
std::function<void (boost::system::error_code const &, std::size_t)> callback = {},
|
std::function<void (boost::system::error_code const &, std::size_t)> callback = nullptr);
|
||||||
traffic_type = traffic_type::generic);
|
|
||||||
|
|
||||||
boost::asio::ip::tcp::endpoint remote_endpoint () const;
|
boost::asio::ip::tcp::endpoint remote_endpoint () const;
|
||||||
boost::asio::ip::tcp::endpoint local_endpoint () const;
|
boost::asio::ip::tcp::endpoint local_endpoint () const;
|
||||||
|
|
@ -110,8 +109,8 @@ public:
|
||||||
std::chrono::seconds get_default_timeout_value () const;
|
std::chrono::seconds get_default_timeout_value () const;
|
||||||
void set_timeout (std::chrono::seconds);
|
void set_timeout (std::chrono::seconds);
|
||||||
|
|
||||||
bool max (nano::transport::traffic_type = traffic_type::generic) const;
|
bool max () const;
|
||||||
bool full (nano::transport::traffic_type = traffic_type::generic) const;
|
bool full () const;
|
||||||
|
|
||||||
nano::transport::socket_type type () const
|
nano::transport::socket_type type () const
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -7,13 +7,22 @@
|
||||||
|
|
||||||
namespace nano::transport
|
namespace nano::transport
|
||||||
{
|
{
|
||||||
/**
|
|
||||||
* Used for message prioritization and bandwidth limits
|
|
||||||
*/
|
|
||||||
enum class traffic_type
|
enum class traffic_type
|
||||||
{
|
{
|
||||||
generic,
|
generic,
|
||||||
bootstrap, // Ascending bootstrap (asc_pull_ack, asc_pull_req) traffic
|
bootstrap_server,
|
||||||
|
bootstrap_requests,
|
||||||
|
block_broadcast,
|
||||||
|
block_broadcast_initial,
|
||||||
|
block_broadcast_rpc,
|
||||||
|
confirmation_requests,
|
||||||
|
keepalive,
|
||||||
|
vote,
|
||||||
|
vote_rebroadcast,
|
||||||
|
vote_reply,
|
||||||
|
rep_crawler,
|
||||||
|
telemetry,
|
||||||
|
test,
|
||||||
};
|
};
|
||||||
|
|
||||||
std::string_view to_string (traffic_type);
|
std::string_view to_string (traffic_type);
|
||||||
|
|
|
||||||
|
|
@ -173,12 +173,6 @@ std::size_t nano::vote_generator::generate (std::vector<std::shared_ptr<nano::bl
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::vote_generator::set_reply_action (std::function<void (std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> const &)> action_a)
|
|
||||||
{
|
|
||||||
release_assert (!reply_action);
|
|
||||||
reply_action = action_a;
|
|
||||||
}
|
|
||||||
|
|
||||||
void nano::vote_generator::broadcast (nano::unique_lock<nano::mutex> & lock_a)
|
void nano::vote_generator::broadcast (nano::unique_lock<nano::mutex> & lock_a)
|
||||||
{
|
{
|
||||||
debug_assert (lock_a.owns_lock ());
|
debug_assert (lock_a.owns_lock ());
|
||||||
|
|
@ -218,6 +212,10 @@ void nano::vote_generator::broadcast (nano::unique_lock<nano::mutex> & lock_a)
|
||||||
|
|
||||||
void nano::vote_generator::reply (nano::unique_lock<nano::mutex> & lock_a, request_t && request_a)
|
void nano::vote_generator::reply (nano::unique_lock<nano::mutex> & lock_a, request_t && request_a)
|
||||||
{
|
{
|
||||||
|
if (request_a.second->max (nano::transport::traffic_type::vote_reply))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
lock_a.unlock ();
|
lock_a.unlock ();
|
||||||
auto i (request_a.first.cbegin ());
|
auto i (request_a.first.cbegin ());
|
||||||
auto n (request_a.first.cend ());
|
auto n (request_a.first.cend ());
|
||||||
|
|
@ -246,9 +244,11 @@ void nano::vote_generator::reply (nano::unique_lock<nano::mutex> & lock_a, reque
|
||||||
if (!hashes.empty ())
|
if (!hashes.empty ())
|
||||||
{
|
{
|
||||||
stats.add (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes, stat::dir::in, hashes.size ());
|
stats.add (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes, stat::dir::in, hashes.size ());
|
||||||
vote (hashes, roots, [this, &channel = request_a.second] (std::shared_ptr<nano::vote> const & vote_a) {
|
|
||||||
this->reply_action (vote_a, channel);
|
vote (hashes, roots, [this, channel = request_a.second] (std::shared_ptr<nano::vote> const & vote_a) {
|
||||||
this->stats.inc (nano::stat::type::requests, nano::stat::detail::requests_generated_votes, stat::dir::in);
|
nano::confirm_ack confirm{ config.network_params.network, vote_a };
|
||||||
|
channel->send (confirm, nano::transport::traffic_type::vote_reply);
|
||||||
|
stats.inc (nano::stat::type::requests, nano::stat::detail::requests_generated_votes, stat::dir::in);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,6 @@ public:
|
||||||
void add (nano::root const &, nano::block_hash const &);
|
void add (nano::root const &, nano::block_hash const &);
|
||||||
/** Queue blocks for vote generation, returning the number of successful candidates.*/
|
/** Queue blocks for vote generation, returning the number of successful candidates.*/
|
||||||
std::size_t generate (std::vector<std::shared_ptr<nano::block>> const & blocks_a, std::shared_ptr<nano::transport::channel> const & channel_a);
|
std::size_t generate (std::vector<std::shared_ptr<nano::block>> const & blocks_a, std::shared_ptr<nano::transport::channel> const & channel_a);
|
||||||
void set_reply_action (std::function<void (std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> const &)>);
|
|
||||||
|
|
||||||
void start ();
|
void start ();
|
||||||
void stop ();
|
void stop ();
|
||||||
|
|
@ -59,9 +58,6 @@ private:
|
||||||
bool should_vote (transaction_variant_t const &, nano::root const &, nano::block_hash const &) const;
|
bool should_vote (transaction_variant_t const &, nano::root const &, nano::block_hash const &) const;
|
||||||
bool broadcast_predicate () const;
|
bool broadcast_predicate () const;
|
||||||
|
|
||||||
private:
|
|
||||||
std::function<void (std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> &)> reply_action; // must be set only during initialization by using set_reply_action
|
|
||||||
|
|
||||||
private: // Dependencies
|
private: // Dependencies
|
||||||
nano::node_config const & config;
|
nano::node_config const & config;
|
||||||
nano::node & node;
|
nano::node & node;
|
||||||
|
|
|
||||||
|
|
@ -970,7 +970,7 @@ std::shared_ptr<nano::block> nano::wallet::send_action (nano::account const & so
|
||||||
if (block != nullptr)
|
if (block != nullptr)
|
||||||
{
|
{
|
||||||
cached_block = true;
|
cached_block = true;
|
||||||
wallets.node.network.flood_block (block, nano::transport::buffer_drop_policy::no_limiter_drop);
|
wallets.node.network.flood_block (block, nano::transport::traffic_type::block_broadcast_initial);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (status != MDB_NOTFOUND)
|
else if (status != MDB_NOTFOUND)
|
||||||
|
|
|
||||||
|
|
@ -739,7 +739,7 @@ void nano_qt::block_viewer::rebroadcast_action (nano::block_hash const & hash_a)
|
||||||
auto block (wallet.node.ledger.any.block_get (transaction, hash_a));
|
auto block (wallet.node.ledger.any.block_get (transaction, hash_a));
|
||||||
if (block != nullptr)
|
if (block != nullptr)
|
||||||
{
|
{
|
||||||
wallet.node.network.flood_block (block);
|
wallet.node.network.flood_block (block, nano::transport::traffic_type::block_broadcast_initial);
|
||||||
auto successor = wallet.node.ledger.any.block_successor (transaction, hash_a);
|
auto successor = wallet.node.ledger.any.block_successor (transaction, hash_a);
|
||||||
if (successor)
|
if (successor)
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -300,7 +300,7 @@ TEST (node, fork_storm)
|
||||||
auto open_result (node_i->process (open));
|
auto open_result (node_i->process (open));
|
||||||
ASSERT_EQ (nano::block_status::progress, open_result);
|
ASSERT_EQ (nano::block_status::progress, open_result);
|
||||||
auto transaction (node_i->store.tx_begin_read ());
|
auto transaction (node_i->store.tx_begin_read ());
|
||||||
node_i->network.flood_block (open);
|
node_i->network.flood_block (open, nano::transport::traffic_type::test);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
auto again (true);
|
auto again (true);
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue