Tracking outgoing packet statistics.

This commit is contained in:
clemahieu 2017-07-04 13:12:24 -05:00
commit e3cec6c7ee
5 changed files with 43 additions and 29 deletions

View file

@ -16,6 +16,6 @@ enum class rai_networks
};
rai::rai_networks const rai_network = rai_networks::ACTIVE_NETWORK;
int const database_check_interval = rai_network == rai::rai_networks::rai_test_network ? 64 : 256;
size_t const database_size_increment = rai_network == rai::rai_networks::rai_test_network ? 2 * 1024 * 1024 : 256 * 1024 * 1024;
size_t const blocks_per_transaction = rai::rai_network == rai::rai_networks::rai_test_network ? 2 : 2048;
size_t const database_size_increment = rai_network == rai::rai_networks::rai_test_network ? 2 * 1024 * 1024 : 1024 * 1024 * 1024;
size_t const blocks_per_transaction = rai::rai_network == rai::rai_networks::rai_test_network ? 2 : 16384;
}

View file

@ -70,11 +70,11 @@ TEST (network, send_keepalive)
auto node1 (std::make_shared <rai::node> (init1, system.service, 24001, rai::unique_path (), system.alarm, system.logging, system.work));
node1->start ();
system.nodes [0]->network.send_keepalive (node1->network.endpoint ());
auto initial (system.nodes [0]->network.keepalive_count);
auto initial (system.nodes [0]->network.incoming.keepalive.load ());
ASSERT_EQ (0, system.nodes [0]->peers.list ().size ());
ASSERT_EQ (0, node1->peers.list ().size ());
auto iterations (0);
while (system.nodes [0]->network.keepalive_count == initial)
while (system.nodes [0]->network.incoming.keepalive == initial)
{
system.poll ();
++iterations;
@ -98,9 +98,9 @@ TEST (network, keepalive_ipv4)
auto node1 (std::make_shared <rai::node> (init1, system.service, 24001, rai::unique_path (), system.alarm, system.logging, system.work));
node1->start ();
node1->send_keepalive (rai::endpoint (boost::asio::ip::address_v4::loopback (), 24000));
auto initial (system.nodes [0]->network.keepalive_count);
auto initial (system.nodes [0]->network.incoming.keepalive.load ());
auto iterations (0);
while (system.nodes [0]->network.keepalive_count == initial)
while (system.nodes [0]->network.incoming.keepalive == initial)
{
system.poll ();
++iterations;
@ -154,7 +154,7 @@ TEST (network, send_discarded_publish)
ASSERT_EQ (genesis.hash (), system.nodes [0]->latest (rai::test_genesis_key.pub));
ASSERT_EQ (genesis.hash (), system.nodes [1]->latest (rai::test_genesis_key.pub));
auto iterations (0);
while (system.nodes [1]->network.publish_count == 0)
while (system.nodes [1]->network.incoming.publish == 0)
{
system.poll ();
++iterations;
@ -173,7 +173,7 @@ TEST (network, send_invalid_publish)
ASSERT_EQ (genesis.hash (), system.nodes [0]->latest (rai::test_genesis_key.pub));
ASSERT_EQ (genesis.hash (), system.nodes [1]->latest (rai::test_genesis_key.pub));
auto iterations (0);
while (system.nodes [1]->network.publish_count == 0)
while (system.nodes [1]->network.incoming.publish == 0)
{
system.poll ();
++iterations;
@ -217,7 +217,7 @@ TEST (network, send_valid_publish)
rai::block_hash latest2 (system.nodes [1]->latest (rai::test_genesis_key.pub));
system.nodes [1]->process_receive_republish (std::unique_ptr <rai::block> (new rai::send_block (block2)));
auto iterations (0);
while (system.nodes [0]->network.publish_count == 0)
while (system.nodes [0]->network.incoming.publish == 0)
{
system.poll ();
++iterations;

View file

@ -1029,7 +1029,7 @@ TEST (node, fork_no_vote_quorum)
confirm.serialize (stream);
}
node2.network.confirm_send (confirm, bytes, node3.network.endpoint ());
while (node3.network.confirm_ack_count < 3)
while (node3.network.incoming.confirm_ack < 3)
{
system.poll ();
}
@ -1298,7 +1298,7 @@ TEST (node, no_voting)
++iterations;
ASSERT_GT (200, iterations);
}
ASSERT_EQ (0, node1.network.confirm_ack_count);
ASSERT_EQ (0, node1.network.incoming.confirm_ack);
}
TEST (node, start_observer)

View file

@ -31,6 +31,14 @@ int constexpr rai::port_mapping::mapping_timeout;
int constexpr rai::port_mapping::check_timeout;
unsigned constexpr rai::active_transactions::announce_interval_ms;
rai::message_statistics::message_statistics () :
keepalive (0),
publish (0),
confirm_req (0),
confirm_ack (0)
{
}
rai::network::network (boost::asio::io_service & service_a, uint16_t port, rai::node & node_a) :
socket (service_a, rai::endpoint (boost::asio::ip::address_v6::any (), port)),
service (service_a),
@ -38,10 +46,6 @@ resolver (service_a),
node (node_a),
bad_sender_count (0),
on (true),
keepalive_count (0),
publish_count (0),
confirm_req_count (0),
confirm_ack_count (0),
insufficient_work_count (0),
error_count (0)
{
@ -54,11 +58,10 @@ void rai::network::receive ()
BOOST_LOG (node.log) << "Receiving packet";
}
std::unique_lock <std::mutex> lock (socket_mutex);
socket.async_receive_from (boost::asio::buffer (buffer.data (), buffer.size ()), remote,
[this] (boost::system::error_code const & error, size_t size_a)
{
receive_action (error, size_a);
});
socket.async_receive_from (boost::asio::buffer (buffer.data (), buffer.size ()), remote, [this] (boost::system::error_code const & error, size_t size_a)
{
receive_action (error, size_a);
});
}
void rai::network::stop ()
@ -82,7 +85,8 @@ void rai::network::send_keepalive (rai::endpoint const & endpoint_a)
{
BOOST_LOG (node.log) << boost::str (boost::format ("Keepalive req sent to %1%") % endpoint_a);
}
std::weak_ptr <rai::node> node_w (node.shared ());
++outgoing.keepalive;
std::weak_ptr <rai::node> node_w (node.shared ());
send_buffer (bytes->data (), bytes->size (), endpoint_a, [bytes, node_w, endpoint_a] (boost::system::error_code const & ec, size_t)
{
if (auto node_l = node_w.lock ())
@ -124,6 +128,7 @@ void rai::node::keepalive (std::string const & address_a, uint16_t port_a)
void rai::network::republish (rai::block_hash const & hash_a, std::shared_ptr <std::vector <uint8_t>> buffer_a, rai::endpoint endpoint_a)
{
++outgoing.publish;
if (node.config.logging.network_publish_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Publishing %1% to %2%") % hash_a.to_string () % endpoint_a);
@ -281,6 +286,7 @@ void rai::network::send_confirm_req (rai::endpoint const & endpoint_a, rai::bloc
BOOST_LOG (node.log) << boost::str (boost::format ("Sending confirm req to %1%") % endpoint_a);
}
std::weak_ptr <rai::node> node_w (node.shared ());
++outgoing.confirm_req;
send_buffer (bytes->data (), bytes->size (), endpoint_a, [bytes, node_w] (boost::system::error_code const & ec, size_t size)
{
if (auto node_l = node_w.lock ())
@ -342,7 +348,7 @@ public:
{
BOOST_LOG (node.log) << boost::str (boost::format ("Received keepalive message from %1%") % sender);
}
++node.network.keepalive_count;
++node.network.incoming.keepalive;
node.peers.contacted (sender);
node.network.merge_peers (message_a.peers);
}
@ -352,7 +358,7 @@ public:
{
BOOST_LOG (node.log) << boost::str (boost::format ("Publish message from %1% for %2%") % sender % message_a.block->hash ().to_string ());
}
++node.network.publish_count;
++node.network.incoming.publish;
node.peers.contacted (sender);
node.peers.insert (sender);
node.process_receive_republish (message_a.block->clone ());
@ -363,7 +369,7 @@ public:
{
BOOST_LOG (node.log) << boost::str (boost::format ("Confirm_req message from %1% for %2%") % sender % message_a.block->hash ().to_string ());
}
++node.network.confirm_req_count;
++node.network.incoming.confirm_req;
node.peers.contacted (sender);
node.peers.insert (sender);
node.process_receive_republish (message_a.block->clone ());
@ -378,7 +384,7 @@ public:
{
BOOST_LOG (node.log) << boost::str (boost::format ("Received confirm_ack message from %1% for %2%") % sender % message_a.vote.block->hash ().to_string ());
}
++node.network.confirm_ack_count;
++node.network.incoming.confirm_ack;
node.peers.contacted (sender);
node.peers.insert (sender);
node.process_receive_republish (message_a.vote.block->clone ());
@ -1328,6 +1334,7 @@ void rai::network::confirm_send (rai::confirm_ack const & confirm_a, std::shared
BOOST_LOG (node.log) << boost::str (boost::format ("Sending confirm_ack for block %1% to %2%") % confirm_a.vote.block->hash ().to_string () % endpoint_a);
}
std::weak_ptr <rai::node> node_w (node.shared ());
++outgoing.confirm_ack;
node.network.send_buffer (bytes_a->data (), bytes_a->size (), endpoint_a, [bytes_a, node_w, endpoint_a] (boost::system::error_code const & ec, size_t size_a)
{
if (auto node_l = node_w.lock ())

View file

@ -256,6 +256,15 @@ public:
uint64_t check_count;
bool on;
};
class message_statistics
{
public:
message_statistics ();
std::atomic <uint64_t> keepalive;
std::atomic <uint64_t> publish;
std::atomic <uint64_t> confirm_req;
std::atomic <uint64_t> confirm_ack;
};
class network
{
public:
@ -285,12 +294,10 @@ public:
rai::node & node;
uint64_t bad_sender_count;
bool on;
uint64_t keepalive_count;
uint64_t publish_count;
uint64_t confirm_req_count;
uint64_t confirm_ack_count;
uint64_t insufficient_work_count;
uint64_t error_count;
rai::message_statistics incoming;
rai::message_statistics outgoing;
static uint16_t const node_port = rai::rai_network == rai::rai_networks::rai_live_network ? 7075 : 54000;
};
class logging