Broadcast confirm_req in batches (#1243)

This commit is contained in:
SergiySW 2018-11-19 20:15:15 +03:00 committed by Roy Keene
commit 1247e92f67
4 changed files with 44 additions and 11 deletions

View file

@ -324,7 +324,7 @@ void rai::network::republish_block_batch (std::deque<std::shared_ptr<rai::block>
if (!blocks_a.empty ())
{
std::weak_ptr<rai::node> node_w (node.shared ());
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a), [node_w, blocks_a, delay_a]() {
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks_a, delay_a]() {
if (auto node_l = node_w.lock ())
{
node_l->network.republish_block_batch (blocks_a, delay_a);
@ -361,7 +361,7 @@ void rai::network::broadcast_confirm_req (std::shared_ptr<rai::block> block_a)
if (list->empty () || node.peers.total_weight () < node.config.online_weight_minimum.number ())
{
// broadcast request to all peers
list = std::make_shared<std::vector<rai::peer_information>> (node.peers.list_vector ());
list = std::make_shared<std::vector<rai::peer_information>> (node.peers.list_vector (100));
}
/*
@ -410,6 +410,31 @@ void rai::network::broadcast_confirm_req_base (std::shared_ptr<rai::block> block
}
}
void rai::network::broadcast_confirm_req_batch (std::deque<std::pair<std::shared_ptr<rai::block>, std::shared_ptr<std::vector<rai::peer_information>>>> deque_a, unsigned delay_a)
{
auto pair (deque_a.front ());
deque_a.pop_front ();
auto block (pair.first);
// confirm_req to representatives
auto endpoints (pair.second);
if (!endpoints->empty ())
{
broadcast_confirm_req_base (block, endpoints, delay_a);
}
/* Continue while blocks remain
Broadcast with random delay between delay_a & 2*delay_a */
if (!deque_a.empty ())
{
std::weak_ptr<rai::node> node_w (node.shared ());
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, deque_a, delay_a]() {
if (auto node_l = node_w.lock ())
{
node_l->network.broadcast_confirm_req_batch (deque_a, delay_a);
}
});
}
}
void rai::network::send_confirm_req (rai::endpoint const & endpoint_a, std::shared_ptr<rai::block> block)
{
rai::confirm_req message (block);
@ -2920,8 +2945,8 @@ void rai::active_transactions::announce_votes (std::unique_lock<std::mutex> & lo
auto transaction (node.store.tx_begin_read ());
unsigned unconfirmed_count (0);
unsigned unconfirmed_announcements (0);
unsigned mass_request_count (0);
std::deque<std::shared_ptr<rai::block>> rebroadcast_bundle;
std::deque<std::pair<std::shared_ptr<rai::block>, std::shared_ptr<std::vector<rai::peer_information>>>> confirm_req_bundle;
auto roots_size (roots.size ());
for (auto i (roots.begin ()), n (roots.end ()); i != n; ++i)
@ -3028,16 +3053,14 @@ void rai::active_transactions::announce_votes (std::unique_lock<std::mutex> & lo
}
}
}
if (!reps->empty () && (total_weight > node.config.online_weight_minimum.number () || mass_request_count > 20))
if ((!reps->empty () && total_weight > node.config.online_weight_minimum.number ()) || roots.size () > 5)
{
// broadcast_confirm_req_base modifies reps, so we clone it once to avoid aliasing
node.network.broadcast_confirm_req_base (i->confirm_req_options.first, std::make_shared<std::vector<rai::peer_information>> (*reps), 0);
confirm_req_bundle.push_back (std::make_pair (i->confirm_req_options.first, reps));
}
else
{
// broadcast request to all peers
node.network.broadcast_confirm_req_base (i->confirm_req_options.first, std::make_shared<std::vector<rai::peer_information>> (node.peers.list_vector ()), 0);
++mass_request_count;
confirm_req_bundle.push_back (std::make_pair (i->confirm_req_options.first, std::make_shared<std::vector<rai::peer_information>> (node.peers.list_vector (100))));
}
}
}
@ -3049,6 +3072,11 @@ void rai::active_transactions::announce_votes (std::unique_lock<std::mutex> & lo
{
node.network.republish_block_batch (rebroadcast_bundle);
}
//confirm_req broadcast
if (!confirm_req_bundle.empty ())
{
node.network.broadcast_confirm_req_batch (confirm_req_bundle);
}
for (auto i (inactive.begin ()), n (inactive.end ()); i != n; ++i)
{
auto root_it (roots.find (*i));
@ -3082,7 +3110,7 @@ void rai::active_transactions::announce_loop ()
while (!stopped)
{
announce_votes (lock);
condition.wait_for (lock, std::chrono::milliseconds (announce_interval_ms + roots.size () * node.network.broadcast_interval_ms));
condition.wait_for (lock, std::chrono::milliseconds (announce_interval_ms + roots.size () * node.network.broadcast_interval_ms * 3 / 2));
}
}

View file

@ -302,6 +302,7 @@ public:
void send_node_id_handshake (rai::endpoint const &, boost::optional<rai::uint256_union> const & query, boost::optional<rai::uint256_union> const & respond_to);
void broadcast_confirm_req (std::shared_ptr<rai::block>);
void broadcast_confirm_req_base (std::shared_ptr<rai::block>, std::shared_ptr<std::vector<rai::peer_information>>, unsigned, bool = false);
void broadcast_confirm_req_batch (std::deque<std::pair<std::shared_ptr<rai::block>, std::shared_ptr<std::vector<rai::peer_information>>>>, unsigned = broadcast_interval_ms);
void send_confirm_req (rai::endpoint const &, std::shared_ptr<rai::block>);
void send_buffer (uint8_t const *, size_t, rai::endpoint const &, std::function<void(boost::system::error_code const &, size_t)>);
rai::endpoint endpoint ();

View file

@ -107,7 +107,7 @@ std::map<rai::endpoint, unsigned> rai::peer_container::list_version ()
return result;
}
std::vector<rai::peer_information> rai::peer_container::list_vector ()
std::vector<rai::peer_information> rai::peer_container::list_vector (size_t count_a)
{
std::vector<peer_information> result;
std::lock_guard<std::mutex> lock (mutex);
@ -116,6 +116,10 @@ std::vector<rai::peer_information> rai::peer_container::list_vector ()
result.push_back (*i);
}
std::random_shuffle (result.begin (), result.end ());
if (result.size () > count_a)
{
result.resize (count_a, rai::peer_information (rai::endpoint{}, 0));
}
return result;
}

View file

@ -80,7 +80,7 @@ public:
// List of all peers
std::deque<rai::endpoint> list ();
std::map<rai::endpoint, unsigned> list_version ();
std::vector<peer_information> list_vector ();
std::vector<peer_information> list_vector (size_t);
// A list of random peers sized for the configured rebroadcast fanout
std::deque<rai::endpoint> list_fanout ();
// Returns a list of probable reps and their weight