Removing send message buffer. Originally this was a measure to stop nodes from behaving poorly and flooding the network with too much traffic. The network code has been improved to a point where this doesn't seem like a concern.

This commit is contained in:
clemahieu 2017-04-01 22:03:03 -05:00
commit 4fa8d80583
4 changed files with 13 additions and 48 deletions

View file

@ -191,19 +191,17 @@ TEST (network, send_valid_confirm_ack)
system.wallet (1)->insert_adhoc (key2.prv);
rai::block_hash latest1 (system.nodes [0]->latest (rai::test_genesis_key.pub));
rai::send_block block2 (latest1, key2.pub, 50, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (latest1));
auto hash2 (block2.hash ());
rai::block_hash latest2 (system.nodes [1]->latest (rai::test_genesis_key.pub));
system.nodes [0]->process_receive_republish (std::unique_ptr <rai::block> (new rai::send_block (block2)));
auto iterations (0);
while (system.nodes [1]->network.confirm_ack_count == 0)
// Keep polling until latest block changes
while (system.nodes [1]->latest (rai::test_genesis_key.pub) == latest2)
{
system.poll ();
++iterations;
ASSERT_LT (iterations, 200);
}
rai::block_hash latest3 (system.nodes [1]->latest (rai::test_genesis_key.pub));
ASSERT_NE (latest2, latest3);
ASSERT_EQ (hash2, latest3);
// Make sure the balance has decreased after procssing the block.
ASSERT_EQ (50, system.nodes [1]->balance (rai::test_genesis_key.pub));
}

View file

@ -1260,7 +1260,7 @@ TEST (node, rep_list)
{
if (reps [0].endpoint == node0.network.endpoint ())
{
if (reps [0].rep_weight == rai::genesis_amount - rai::Mrai_ratio)
if (!reps [0].rep_weight.is_zero ())
{
done = true;
}

View file

@ -2101,9 +2101,12 @@ std::vector <rai::peer_information> rai::peer_container::representatives (size_t
std::vector <peer_information> result;
result.reserve (count_a);
std::lock_guard <std::mutex> lock (mutex);
for (auto i (peers.get <6> ().begin ()), n (peers.get <6> ().end ()); i != n && result.size () < count_a && !i->rep_weight.is_zero (); ++i)
for (auto i (peers.get <6> ().begin ()), n (peers.get <6> ().end ()); i != n && result.size () < count_a; ++i)
{
result.push_back (*i);
if (!i->rep_weight.is_zero ())
{
result.push_back (*i);
}
}
return result;
}
@ -2356,57 +2359,23 @@ std::ostream & operator << (std::ostream & stream_a, std::chrono::system_clock::
return stream_a;
}
void rai::network::initiate_send ()
void rai::network::send_buffer (uint8_t const * data_a, size_t size_a, rai::endpoint const & endpoint_a, std::function <void (boost::system::error_code const &, size_t)> callback_a)
{
assert (!socket_mutex.try_lock ());
assert (!sends.empty ());
auto & front (sends.front ());
std::unique_lock <std::mutex> lock (socket_mutex);
if (node.config.logging.network_packet_logging ())
{
BOOST_LOG (node.log) << "Sending packet";
}
socket.async_send_to (boost::asio::buffer (front.data, front.size), front.endpoint, [this, front] (boost::system::error_code const & ec, size_t size_a)
socket.async_send_to (boost::asio::buffer (data_a, size_a), endpoint_a, [this, callback_a] (boost::system::error_code const & ec, size_t size_a)
{
rai::send_info self;
bool empty;
{
std::unique_lock <std::mutex> lock (socket_mutex);
assert (!sends.empty ());
self = sends.front ();
sends.pop ();
empty = sends.empty ();
}
self.callback (ec, size_a);
callback_a (ec, size_a);
if (node.config.logging.network_packet_logging ())
{
BOOST_LOG (node.log) << "Packet send complete";
}
if (!empty)
{
if (node.config.logging.network_packet_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Delaying next packet send %1% microseconds") % node.config.packet_delay_microseconds);
}
node.alarm.add (std::chrono::system_clock::now () + std::chrono::microseconds (node.config.packet_delay_microseconds), [this] ()
{
std::unique_lock <std::mutex> lock (socket_mutex);
initiate_send ();
});
}
});
}
void rai::network::send_buffer (uint8_t const * data_a, size_t size_a, rai::endpoint const & endpoint_a, std::function <void (boost::system::error_code const &, size_t)> callback_a)
{
std::unique_lock <std::mutex> lock (socket_mutex);
auto initiate (sends.empty ());
sends.push ({data_a, size_a, endpoint_a, callback_a});
if (initiate)
{
initiate_send ();
}
}
uint64_t rai::block_store::now ()
{
boost::posix_time::ptime epoch (boost::gregorian::date (1970, 1, 1));

View file

@ -277,7 +277,6 @@ public:
void send_keepalive (rai::endpoint const &);
void broadcast_confirm_req (rai::block const &);
void send_confirm_req (rai::endpoint const &, rai::block const &);
void initiate_send ();
void send_buffer (uint8_t const *, size_t, rai::endpoint const &, std::function <void (boost::system::error_code const &, size_t)>);
rai::endpoint endpoint ();
rai::endpoint remote;
@ -288,7 +287,6 @@ public:
boost::asio::ip::udp::resolver resolver;
rai::node & node;
uint64_t bad_sender_count;
std::queue <rai::send_info> sends;
bool on;
uint64_t keepalive_count;
uint64_t publish_count;