Creating confirmation_solicitor class. This class manages taking unconfirmed elections and creating confirmation_req bundles to the needed representatives.

This commit is contained in:
clemahieu 2019-12-28 15:43:59 -03:00
commit 41b3f5e327
No known key found for this signature in database
GPG key ID: 43708520C8DFB938
5 changed files with 127 additions and 172 deletions

View file

@ -43,6 +43,8 @@ add_library (node
common.cpp
confirmation_height_processor.hpp
confirmation_height_processor.cpp
confirmation_solicitor.cpp
confirmation_solicitor.hpp
daemonconfig.hpp
daemonconfig.cpp
distributed_work.hpp

View file

@ -18,6 +18,7 @@ election_time_to_live (node.network_params.network.is_test_network () ? 0s : 10s
multipliers_cb (20, 1.),
trended_active_difficulty (node.network_params.network.publish_threshold),
next_frontier_check (steady_clock::now ()),
solicitor (node_a),
thread ([this]() {
nano::thread_role::set (nano::thread_role::name::request_loop);
request_loop ();
@ -202,109 +203,11 @@ void nano::active_transactions::election_escalate (std::shared_ptr<nano::electio
}
}
void nano::active_transactions::election_broadcast (std::shared_ptr<nano::election> & election_l, nano::transaction const & transaction_l, std::deque<std::shared_ptr<nano::block>> & blocks_bundle_l, std::unordered_set<nano::qualified_root> & inactive_l, nano::qualified_root & root_l)
{
if (node.ledger.could_fit (transaction_l, *election_l->status.winner))
{
// Broadcast current winner
if (blocks_bundle_l.size () < max_block_broadcasts)
{
blocks_bundle_l.push_back (election_l->status.winner);
}
}
else if (election_l->confirmation_request_count != 0)
{
election_l->stop ();
inactive_l.insert (root_l);
}
}
bool nano::active_transactions::election_request_confirm (std::shared_ptr<nano::election> & election_l, std::vector<nano::representative> const & representatives_l, size_t const & roots_size_l,
std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<std::shared_ptr<nano::transport::channel>>>>> & single_confirm_req_bundle_l,
std::unordered_map<std::shared_ptr<nano::transport::channel>, std::deque<std::pair<nano::block_hash, nano::root>>> & batched_confirm_req_bundle_l)
{
bool inserted_into_any_bundle{ false };
std::vector<std::shared_ptr<nano::transport::channel>> rep_channels_missing_vote_l;
// Add all rep endpoints that haven't already voted
for (const auto & rep : representatives_l)
{
if (election_l->last_votes.find (rep.account) == election_l->last_votes.end ())
{
rep_channels_missing_vote_l.push_back (rep.channel);
if (node.config.logging.vote_logging () && election_l->confirmation_request_count > 0)
{
node.logger.try_log ("Representative did not respond to confirm_req, retrying: ", rep.account.to_account ());
}
}
}
// Unique channels as there can be multiple reps per channel
rep_channels_missing_vote_l.erase (std::unique (rep_channels_missing_vote_l.begin (), rep_channels_missing_vote_l.end ()), rep_channels_missing_vote_l.end ());
bool low_reps_weight (rep_channels_missing_vote_l.empty () || node.rep_crawler.total_weight () < node.config.online_weight_minimum.number ());
if (low_reps_weight && roots_size_l <= 5 && !node.network_params.network.is_test_network ())
{
// Spam mode
auto deque_l (node.network.udp_channels.random_set (100));
auto vec (std::make_shared<std::vector<std::shared_ptr<nano::transport::channel>>> ());
for (auto i : deque_l)
{
vec->push_back (i);
}
single_confirm_req_bundle_l.push_back (std::make_pair (election_l->status.winner, vec));
inserted_into_any_bundle = true;
}
else
{
auto single_confirm_req_channels_l (std::make_shared<std::vector<std::shared_ptr<nano::transport::channel>>> ());
for (auto & rep : rep_channels_missing_vote_l)
{
if (rep->get_network_version () >= node.network_params.protocol.tcp_realtime_protocol_version_min)
{
// Send batch request to peers supporting confirm_req by hash + root
auto rep_request_l (batched_confirm_req_bundle_l.find (rep));
auto block_l (election_l->status.winner);
auto root_hash_l (std::make_pair (block_l->hash (), block_l->root ()));
if (rep_request_l == batched_confirm_req_bundle_l.end ())
{
// Maximum number of representatives
if (batched_confirm_req_bundle_l.size () < max_confirm_representatives)
{
std::deque<std::pair<nano::block_hash, nano::root>> insert_root_hash = { root_hash_l };
batched_confirm_req_bundle_l.insert (std::make_pair (rep, insert_root_hash));
inserted_into_any_bundle = true;
}
}
// Maximum number of hashes
else if (rep_request_l->second.size () < max_confirm_req_batches * nano::network::confirm_req_hashes_max)
{
rep_request_l->second.push_back (root_hash_l);
inserted_into_any_bundle = true;
}
}
else
{
single_confirm_req_channels_l->push_back (rep);
}
}
// broadcast_confirm_req_base modifies reps, so we clone it once to avoid aliasing
if (single_confirm_req_bundle_l.size () < max_confirm_req && !single_confirm_req_channels_l->empty ())
{
single_confirm_req_bundle_l.push_back (std::make_pair (election_l->status.winner, single_confirm_req_channels_l));
inserted_into_any_bundle = true;
}
}
return inserted_into_any_bundle;
}
void nano::active_transactions::request_confirm (nano::unique_lock<std::mutex> & lock_a)
{
assert (!mutex.try_lock ());
auto transaction_l (node.store.tx_begin_read ());
std::unordered_set<nano::qualified_root> inactive_l;
std::deque<std::shared_ptr<nano::block>> blocks_bundle_l;
std::unordered_map<std::shared_ptr<nano::transport::channel>, std::deque<std::pair<nano::block_hash, nano::root>>> batched_confirm_req_bundle_l;
std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<std::shared_ptr<nano::transport::channel>>>>> single_confirm_req_bundle_l;
/*
* Confirm frontiers when there aren't many confirmations already pending and node finished initial bootstrap
* In auto mode start confirm only if node contains almost principal representative (half of required for principal weight)
@ -330,7 +233,6 @@ void nano::active_transactions::request_confirm (nano::unique_lock<std::mutex> &
// The lowest PoW difficulty elections have a maximum time to live if they are beyond the soft threshold size for the container
auto election_ttl_cutoff_l (std::chrono::steady_clock::now () - election_time_to_live);
auto const representatives_l (node.rep_crawler.representatives (std::numeric_limits<size_t>::max ()));
auto roots_size_l (roots.size ());
auto & sorted_roots_l = roots.get<tag_difficulty> ();
size_t count_l{ 0 };
@ -344,12 +246,17 @@ void nano::active_transactions::request_confirm (nano::unique_lock<std::mutex> &
* Elections extending the soft config.active_elections_size limit are flushed after a certain time-to-live cutoff
* Flushed elections are later re-activated via frontier confirmation
*/
solicitor.prepare ();
for (auto i = sorted_roots_l.begin (), n = sorted_roots_l.end (); i != n; ++i, ++count_l)
{
auto election_l (i->election);
auto root_l (i->root);
if (election_l->confirmed || (election_l->confirmation_request_count != 0 && !node.ledger.could_fit (transaction_l, *election_l->status.winner)))
{
election_l->stop ();
}
// Erase finished elections
if ((election_l->confirmed || election_l->stopped))
if ((election_l->stopped))
{
inactive_l.insert (root_l);
}
@ -363,74 +270,16 @@ void nano::active_transactions::request_confirm (nano::unique_lock<std::mutex> &
// Broadcast and request confirmation
else if (election_l->skip_delay || election_l->election_start < cutoff_l)
{
bool increment_counter_l{ true };
solicitor.add (election_l);
// Escalate long election after a certain time and number of requests performed
if (election_l->confirmation_request_count > 4 && election_l->election_start < long_election_cutoff_l)
{
election_escalate (election_l, transaction_l, roots_size_l);
}
// Block broadcasting
if (election_l->confirmation_request_count % 8 == 1)
{
election_broadcast (election_l, transaction_l, blocks_bundle_l, inactive_l, root_l);
}
// Confirmation requesting
if (election_l->confirmation_request_count % 4 == 0)
{
// If failed to insert into any of the bundles (capped), don't increment the counter so that the same root is sent for confirmation in the next loop
if (!election_request_confirm (election_l, representatives_l, roots_size_l, single_confirm_req_bundle_l, batched_confirm_req_bundle_l))
{
increment_counter_l = false;
}
}
if (increment_counter_l || node.network_params.network.is_test_network ())
{
++election_l->confirmation_request_count;
}
}
}
ongoing_broadcasts = !blocks_bundle_l.empty () + !batched_confirm_req_bundle_l.empty () + !single_confirm_req_bundle_l.empty ();
lock_a.unlock ();
// Rebroadcast unconfirmed blocks
if (!blocks_bundle_l.empty ())
{
node.network.flood_block_many (
std::move (blocks_bundle_l), [this]() {
{
nano::lock_guard<std::mutex> guard_l (this->mutex);
--this->ongoing_broadcasts;
}
this->condition.notify_all ();
},
10); // 10ms/block * 30blocks = 300ms < 500ms
}
// Batch confirmation request
if (!batched_confirm_req_bundle_l.empty ())
{
node.network.broadcast_confirm_req_batched_many (
batched_confirm_req_bundle_l, [this]() {
{
nano::lock_guard<std::mutex> guard_l (this->mutex);
--this->ongoing_broadcasts;
}
this->condition.notify_all ();
},
15); // 15ms/batch * 20batches = 300ms < 500ms
}
// Single confirmation requests
if (!single_confirm_req_bundle_l.empty ())
{
node.network.broadcast_confirm_req_many (
single_confirm_req_bundle_l, [this]() {
{
nano::lock_guard<std::mutex> guard_l (this->mutex);
--this->ongoing_broadcasts;
}
this->condition.notify_all ();
},
30); // 30~60ms/req * 5 reqs = 150~300ms < 500ms
}
solicitor.flush ();
lock_a.lock ();
// Erase inactive elections
for (auto i (inactive_l.begin ()), n (inactive_l.end ()); i != n; ++i)
@ -467,10 +316,6 @@ void nano::active_transactions::request_loop ()
request_confirm (lock);
// Sleep until all broadcasts are done, plus the remaining loop time
while (!stopped && ongoing_broadcasts)
{
condition.wait (lock);
}
if (!stopped)
{
// clang-format off

View file

@ -1,6 +1,7 @@
#pragma once
#include <nano/lib/numbers.hpp>
#include <nano/node/confirmation_solicitor.hpp>
#include <nano/node/election.hpp>
#include <nano/node/gap_cache.hpp>
#include <nano/node/repcrawler.hpp>
@ -121,10 +122,7 @@ public:
std::chrono::milliseconds const election_request_delay;
// Maximum time an election can be kept active if it is extending the container
std::chrono::seconds const election_time_to_live;
static size_t constexpr max_block_broadcasts = 30;
static size_t constexpr max_confirm_representatives = 30;
static size_t constexpr max_confirm_req_batches = 20;
static size_t constexpr max_confirm_req = 5;
boost::circular_buffer<double> multipliers_cb;
uint64_t trended_active_difficulty;
size_t priority_cementable_frontiers_size ();
@ -145,17 +143,12 @@ private:
void request_loop ();
void search_frontiers (nano::transaction const &);
void election_escalate (std::shared_ptr<nano::election> &, nano::transaction const &, size_t const &);
void election_broadcast (std::shared_ptr<nano::election> &, nano::transaction const &, std::deque<std::shared_ptr<nano::block>> &, std::unordered_set<nano::qualified_root> &, nano::qualified_root &);
bool election_request_confirm (std::shared_ptr<nano::election> &, std::vector<nano::representative> const &, size_t const &,
std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<std::shared_ptr<nano::transport::channel>>>>> & single_confirm_req_bundle_l,
std::unordered_map<std::shared_ptr<nano::transport::channel>, std::deque<std::pair<nano::block_hash, nano::root>>> & batched_confirm_req_bundle_l);
void request_confirm (nano::unique_lock<std::mutex> &);
nano::account next_frontier_account{ 0 };
std::chrono::steady_clock::time_point next_frontier_check{ std::chrono::steady_clock::now () };
nano::condition_variable condition;
bool started{ false };
std::atomic<bool> stopped{ false };
unsigned ongoing_broadcasts{ 0 };
// clang-format off
boost::multi_index_container<nano::qualified_root,
mi::indexed_by<
@ -189,6 +182,7 @@ private:
mi::hashed_unique<mi::tag<tag_root>,
mi::member<nano::election_timepoint, nano::qualified_root, &nano::election_timepoint::root>>>>
dropped_elections_cache;
nano::confirmation_solicitor solicitor;
// clang-format on
static size_t constexpr dropped_elections_cache_max{ 32 * 1024 };
boost::thread thread;

View file

@ -0,0 +1,63 @@
#include <nano/node/confirmation_solicitor.hpp>
#include <nano/node/election.hpp>
#include <nano/node/node.hpp>
nano::confirmation_solicitor::confirmation_solicitor (nano::node & node_a) :
node (node_a)
{
}
void nano::confirmation_solicitor::prepare ()
{
assert (!prepared);
requests.clear ();
rebroadcasted = 0;
representatives = node.rep_crawler.representatives ();
prepared = true;
}
void nano::confirmation_solicitor::add (std::shared_ptr<nano::election> election_a)
{
assert (prepared);
if (election_a->confirmation_request_count % 8 == 1 && rebroadcasted++ < max_block_broadcasts)
{
node.network.flood_block (election_a->status.winner);
}
for (auto const & rep : representatives)
{
if (election_a->last_votes.find (rep.account) == election_a->last_votes.end ())
{
requests.insert ({ rep.channel, election_a });
}
}
++election_a->confirmation_request_count;
}
void nano::confirmation_solicitor::flush ()
{
assert (prepared);
size_t batch_count = 0;
size_t single_count = 0;
for (auto i = requests.begin (), n (requests.end ()); i != n;)
{
if (batch_count++ < max_confirm_req_batches && i->channel->get_network_version () >= node.network_params.protocol.tcp_realtime_protocol_version_min)
{
auto channel = i->channel;
std::vector<std::pair<nano::block_hash, nano::root>> roots_hashes_l;
while (i != n && i->channel == channel && roots_hashes_l.size () < nano::network::confirm_req_hashes_max)
{
roots_hashes_l.push_back (std::make_pair (i->election->status.winner->hash (), i->election->status.winner->root ()));
++i;
}
nano::confirm_req req (roots_hashes_l);
channel->send (req);
}
else if (single_count++ < max_confirm_req)
{
node.network.broadcast_confirm_req (i->election->status.winner);
++i;
}
}
prepared = false;
}

View file

@ -0,0 +1,51 @@
#pragma once
#include <nano/node/repcrawler.hpp>
#include <unordered_map>
namespace nano
{
class election;
class node;
/** This class accepts elections that need further votes before they can be confirmed and bundles them in to single confirm_req packets */
class confirmation_solicitor final
{
class request
{
public:
std::shared_ptr<nano::transport::channel> channel;
std::shared_ptr<nano::election> election;
bool operator== (nano::confirmation_solicitor::request const & other_a) const
{
return *channel == *other_a.channel && election == other_a.election;
}
};
class request_hash
{
public:
size_t operator () (nano::confirmation_solicitor::request const & item_a) const
{
return std::hash<std::shared_ptr<nano::election>> ()(item_a.election) ^ std::hash<nano::transport::channel> ()(*item_a.channel);
}
};
public:
confirmation_solicitor (nano::node &);
/** Prepare object for batching election confirmation requests*/
void prepare ();
/** Add an election that needs to be confirmed */
void add (std::shared_ptr<nano::election>);
/** Bundle hashes together for identical channels in to a single confirm_req by hash packet */
void flush ();
private:
static size_t constexpr max_confirm_req_batches = 20;
static size_t constexpr max_confirm_req = 5;
static size_t constexpr max_block_broadcasts = 30;
int rebroadcasted { 0 };
nano::node & node;
std::vector<nano::representative> representatives;
/** Unique channel/hash to be requested */
std::unordered_set<request, nano::confirmation_solicitor::request_hash> requests;
bool prepared { false };
};
}