Confirmation solicitor revamp (#2472)

* Optional minimum version when querying representatives from crawler

* Revamping confirmation_solicitor to mimick previous active_transactions behavior

* Use a time-based approach to throttle confirmation requests and block flooding

* Addressing Wesley review

* Remove unusued node.hpp include (thanks wes)

* Simplify logic by using unordered_map::operator[] which calls the default constructor if not found

* Split solicitor add into broadcast+add and bring back the logic to  active_transactions

This brings back rate-limitting logic and modifying election variables to active_transactions only. Timings are also slightly adjusted:

- Only 2 requests required before starting to flood blocks
- Timings for test network

* Rename flag

* Only broadcast OR request confirmation in the same loop for the same election

* Enclose lambda in clang-format off
This commit is contained in:
Guilherme Lawless 2020-01-15 15:05:23 +00:00 committed by GitHub
commit 0bc041e051
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 181 additions and 84 deletions

View file

@ -6,6 +6,7 @@ add_executable (core_test
block_store.cpp
bootstrap.cpp
confirmation_height.cpp
confirmation_solicitor.cpp
conflicts.cpp
difficulty.cpp
distributed_work.cpp

View file

@ -0,0 +1,60 @@
#include <nano/core_test/testutil.hpp>
#include <nano/lib/jsonconfig.hpp>
#include <nano/node/testing.hpp>
#include <gtest/gtest.h>
using namespace std::chrono_literals;
TEST (confirmation_solicitor, batches)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.enable_voting = false;
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node1 = *system.add_node (node_config);
auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ()));
// Solicitor will only solicit from this representative
nano::representative representative (nano::test_genesis_key.pub, nano::genesis_amount, channel1);
node_config.peering_port = nano::get_available_port ();
nano::node_flags node_flags;
// To prevent races on the solicitor
node_flags.disable_request_loop = true;
auto & node2 = *system.add_node (node_config, node_flags);
// Lock active_transactions which uses the solicitor
{
nano::lock_guard<std::mutex> active_guard (node2.active.mutex);
std::vector<nano::representative> representatives{ representative };
node2.active.solicitor.prepare (representatives);
// Ensure the representatives are correct
ASSERT_EQ (1, representatives.size ());
ASSERT_EQ (channel1, representatives.front ().channel);
ASSERT_EQ (nano::test_genesis_key.pub, representatives.front ().account);
nano::genesis genesis;
auto send (std::make_shared<nano::send_block> (genesis.open->hash (), nano::keypair ().pub, nano::genesis_amount - 100, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.open->hash ())));
for (size_t i (0); i < nano::network::confirm_req_hashes_max; ++i)
{
auto election (std::make_shared<nano::election> (node2, send, false, nullptr));
ASSERT_FALSE (node2.active.solicitor.add (*election));
}
ASSERT_EQ (1, node2.active.solicitor.max_confirm_req_batches);
// Reached the maximum amount of requests for the channel
auto election (std::make_shared<nano::election> (node2, send, false, nullptr));
ASSERT_TRUE (node2.active.solicitor.add (*election));
// Broadcasting should be immediate
ASSERT_EQ (0, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::out));
ASSERT_FALSE (node2.active.solicitor.broadcast (*election));
while (node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::out) < 1)
{
ASSERT_NO_ERROR (system.poll ());
}
}
// From rep crawler
ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::confirm_req, nano::stat::dir::out));
system.deadline_set (5s);
node2.active.solicitor.flush ();
while (node2.stats.count (nano::stat::type::message, nano::stat::detail::confirm_req, nano::stat::dir::out) < 2)
{
ASSERT_NO_ERROR (system.poll ());
}
}

View file

@ -73,7 +73,7 @@ public:
default_rpc_port = is_live_network () ? 7076 : is_beta_network () ? 55000 : 45000;
default_ipc_port = is_live_network () ? 7077 : is_beta_network () ? 56000 : 46000;
default_websocket_port = is_live_network () ? 7078 : is_beta_network () ? 57000 : 47000;
request_interval_ms = is_test_network () ? (is_sanitizer_build ? 100 : 20) : 500;
request_interval_ms = is_test_network () ? 20 : 500;
}
/** Network work thresholds. ~5 seconds of work for the live network */

View file

@ -12,18 +12,25 @@ using namespace std::chrono;
nano::active_transactions::active_transactions (nano::node & node_a) :
node (node_a),
long_election_threshold (node.network_params.network.is_test_network () ? 2s : 24s),
election_request_delay (node.network_params.network.is_test_network () ? 0s : 1s),
election_time_to_live (node.network_params.network.is_test_network () ? 0s : 10s),
multipliers_cb (20, 1.),
trended_active_difficulty (node.network_params.network.publish_threshold),
trended_active_difficulty (node_a.network_params.network.publish_threshold),
next_frontier_check (steady_clock::now ()),
solicitor (node_a),
solicitor (node_a.network, node_a.network_params.network),
long_election_threshold (node_a.network_params.network.is_test_network () ? 2s : 24s),
election_request_delay (node_a.network_params.network.is_test_network () ? 0s : 1s),
election_time_to_live (node_a.network_params.network.is_test_network () ? 0s : 10s),
min_time_between_requests (node_a.network_params.network.is_test_network () ? 25ms : 3s),
min_time_between_floods (node_a.network_params.network.is_test_network () ? 50ms : 6s),
min_request_count_flood (node_a.network_params.network.is_test_network () ? 0 : 2),
// clang-format off
thread ([this]() {
nano::thread_role::set (nano::thread_role::name::request_loop);
request_loop ();
})
// clang-format on
{
assert (min_time_between_requests > std::chrono::milliseconds (node.network_params.network.request_interval_ms));
assert (min_time_between_floods > std::chrono::milliseconds (node.network_params.network.request_interval_ms));
nano::unique_lock<std::mutex> lock (mutex);
condition.wait (lock, [& started = started] { return started; });
}
@ -156,7 +163,7 @@ void nano::active_transactions::post_confirmation_height_set (nano::transaction
void nano::active_transactions::election_escalate (std::shared_ptr<nano::election> & election_l, nano::transaction const & transaction_l, size_t const & roots_size_l)
{
static unsigned constexpr high_confirmation_request_count{ 128 };
constexpr unsigned high_confirmation_request_count{ 128 };
// Log votes for very long unconfirmed elections
if (election_l->confirmation_request_count % (4 * high_confirmation_request_count) == 1)
{
@ -225,28 +232,32 @@ void nano::active_transactions::request_confirm (nano::unique_lock<std::mutex> &
lock_a.lock ();
}
}
auto const now (std::chrono::steady_clock::now ());
// Any new election started from process_live only gets requests after at least 1 second
auto cutoff_l (std::chrono::steady_clock::now () - election_request_delay);
auto cutoff_l (now - election_request_delay);
// Elections taking too long get escalated
auto long_election_cutoff_l (std::chrono::steady_clock::now () - long_election_threshold);
auto long_election_cutoff_l (now - long_election_threshold);
// The lowest PoW difficulty elections have a maximum time to live if they are beyond the soft threshold size for the container
auto election_ttl_cutoff_l (std::chrono::steady_clock::now () - election_time_to_live);
auto election_ttl_cutoff_l (now - election_time_to_live);
// Rate-limitting floods
auto const flood_cutoff (now - min_time_between_floods);
// Rate-limitting confirmation requests
auto const request_cutoff (now - min_time_between_requests);
auto roots_size_l (roots.size ());
auto & sorted_roots_l = roots.get<tag_difficulty> ();
size_t count_l{ 0 };
// Only representatives ready to receive batched confirm_req
solicitor.prepare (node.rep_crawler.representatives (node.network_params.protocol.tcp_realtime_protocol_version_min));
/*
* Loop through active elections in descending order of proof-of-work difficulty, requesting confirmation
*
* Only up to a certain amount of elections are queued for confirmation request and block rebroadcasting. The remaining elections can still be confirmed if votes arrive
* We avoid selecting the same elections repeatedly in the next loops, through a modulo on confirmation_request_count
* An election only gets confirmation_request_count increased after the first confirm_req; after that it is increased every loop unless they don't fit in the queues
* Elections extending the soft config.active_elections_size limit are flushed after a certain time-to-live cutoff
* Flushed elections are later re-activated via frontier confirmation
*/
solicitor.prepare ();
for (auto i = sorted_roots_l.begin (), n = sorted_roots_l.end (); i != n; ++i, ++count_l)
{
auto election_l (i->election);
@ -267,10 +278,20 @@ void nano::active_transactions::request_confirm (nano::unique_lock<std::mutex> &
inactive_l.insert (root_l);
add_dropped_elections_cache (root_l);
}
// Broadcast and request confirmation
// Attempt obtaining votes
else if (election_l->skip_delay || election_l->election_start < cutoff_l)
{
solicitor.add (election_l);
// Broadcast the winner when elections are taking longer to confirm
if (election_l->confirmation_request_count >= min_request_count_flood && election_l->last_broadcast < flood_cutoff && !solicitor.broadcast (*election_l))
{
election_l->last_broadcast = now;
}
// Rate-limited requests for confirmation
else if (election_l->last_request < request_cutoff && !solicitor.add (*election_l))
{
++election_l->confirmation_request_count;
election_l->last_request = now;
}
// Escalate long election after a certain time and number of requests performed
if (election_l->confirmation_request_count > 4 && election_l->election_start < long_election_cutoff_l)
{
@ -307,7 +328,7 @@ void nano::active_transactions::request_loop ()
lock.lock ();
while (!stopped)
while (!stopped && !node.flags.disable_request_loop)
{
// Account for the time spent in request_confirm by defining the wakeup point beforehand
const auto wakeup_l (std::chrono::steady_clock::now () + std::chrono::milliseconds (node.network_params.network.request_interval_ms));
@ -624,7 +645,7 @@ void nano::active_transactions::update_difficulty (std::shared_ptr<nano::block>
else if (opt_transaction_a.is_initialized ())
{
// Only guaranteed to immediately restart the election if the new block is received within 60s of dropping it
static constexpr std::chrono::seconds recently_dropped_cutoff{ 60s };
constexpr std::chrono::seconds recently_dropped_cutoff{ 60s };
if (find_dropped_elections_cache (block_a->qualified_root ()) > std::chrono::steady_clock::now () - recently_dropped_cutoff)
{
lock.unlock ();

View file

@ -116,12 +116,6 @@ public:
void erase_inactive_votes_cache (nano::block_hash const &);
nano::node & node;
std::mutex mutex;
std::chrono::seconds const long_election_threshold;
// Delay until requesting confirmation for an election
std::chrono::milliseconds const election_request_delay;
// Maximum time an election can be kept active if it is extending the container
std::chrono::seconds const election_time_to_live;
static size_t constexpr max_confirm_representatives = 30;
boost::circular_buffer<double> multipliers_cb;
uint64_t trended_active_difficulty;
size_t priority_cementable_frontiers_size ();
@ -133,6 +127,7 @@ public:
void add_dropped_elections_cache (nano::qualified_root const &);
std::chrono::steady_clock::time_point find_dropped_elections_cache (nano::qualified_root const &);
size_t dropped_elections_cache_size ();
nano::confirmation_solicitor solicitor;
private:
// Call action with confirmed block, may be different than what we started with
@ -148,6 +143,20 @@ private:
nano::condition_variable condition;
bool started{ false };
std::atomic<bool> stopped{ false };
// Minimum time an election must be active before escalation
std::chrono::seconds const long_election_threshold;
// Delay until requesting confirmation for an election
std::chrono::milliseconds const election_request_delay;
// Maximum time an election can be kept active if it is extending the container
std::chrono::seconds const election_time_to_live;
// Minimum time between confirmation requests for an election
std::chrono::milliseconds const min_time_between_requests;
// Minimum time between broadcasts of the current winner of an election, as a backup to requesting confirmations
std::chrono::milliseconds const min_time_between_floods;
// Minimum election request count to start broadcasting blocks, as a backup to requesting confirmations
size_t const min_request_count_flood;
// clang-format off
boost::multi_index_container<nano::qualified_root,
mi::indexed_by<
@ -181,7 +190,6 @@ private:
mi::hashed_unique<mi::tag<tag_root>,
mi::member<nano::election_timepoint, nano::qualified_root, &nano::election_timepoint::root>>>>
dropped_elections_cache;
nano::confirmation_solicitor solicitor;
// clang-format on
static size_t constexpr dropped_elections_cache_max{ 32 * 1024 };
boost::thread thread;

View file

@ -1,62 +1,78 @@
#include <nano/node/confirmation_solicitor.hpp>
#include <nano/node/election.hpp>
#include <nano/node/node.hpp>
nano::confirmation_solicitor::confirmation_solicitor (nano::node & node_a) :
node (node_a)
using namespace std::chrono_literals;
nano::confirmation_solicitor::confirmation_solicitor (nano::network & network_a, nano::network_constants const & params_a) :
max_confirm_req_batches (params_a.is_test_network () ? 1 : 20),
max_block_broadcasts (params_a.is_test_network () ? 4 : 30),
network (network_a)
{
}
void nano::confirmation_solicitor::prepare ()
void nano::confirmation_solicitor::prepare (std::vector<nano::representative> const & representatives_a)
{
assert (!prepared);
requests.clear ();
rebroadcasted = 0;
representatives = node.rep_crawler.representatives ();
representatives = representatives_a;
prepared = true;
}
void nano::confirmation_solicitor::add (std::shared_ptr<nano::election> election_a)
bool nano::confirmation_solicitor::broadcast (nano::election const & election_a)
{
assert (prepared);
if (election_a->confirmation_request_count % 8 == 1 && rebroadcasted++ < max_block_broadcasts)
bool result (true);
if (rebroadcasted++ < max_block_broadcasts)
{
node.network.flood_block (election_a->status.winner);
network.flood_block (election_a.status.winner);
result = false;
}
return result;
}
bool nano::confirmation_solicitor::add (nano::election const & election_a)
{
assert (prepared);
auto const max_channel_requests (max_confirm_req_batches * nano::network::confirm_req_hashes_max);
bool result = true;
for (auto const & rep : representatives)
{
if (election_a->last_votes.find (rep.account) == election_a->last_votes.end ())
if (election_a.last_votes.find (rep.account) == election_a.last_votes.end ())
{
requests.insert ({ rep.channel, election_a });
auto & request_queue (requests[rep.channel]);
if (request_queue.size () < max_channel_requests)
{
request_queue.emplace_back (election_a.status.winner->hash (), election_a.status.winner->root ());
result = false;
}
}
}
++election_a->confirmation_request_count;
return result;
}
void nano::confirmation_solicitor::flush ()
{
assert (prepared);
size_t batch_count = 0;
size_t single_count = 0;
for (auto i = requests.begin (), n (requests.end ()); i != n;)
for (auto const & request_queue : requests)
{
if (batch_count++ < max_confirm_req_batches && i->channel->get_network_version () >= node.network_params.protocol.tcp_realtime_protocol_version_min)
auto const & channel (request_queue.first);
std::vector<std::pair<nano::block_hash, nano::root>> roots_hashes_l;
for (auto const & root_hash : request_queue.second)
{
auto channel = i->channel;
std::vector<std::pair<nano::block_hash, nano::root>> roots_hashes_l;
while (i != n && i->channel == channel && roots_hashes_l.size () < nano::network::confirm_req_hashes_max)
roots_hashes_l.push_back (root_hash);
if (roots_hashes_l.size () == nano::network::confirm_req_hashes_max)
{
roots_hashes_l.push_back (std::make_pair (i->election->status.winner->hash (), i->election->status.winner->root ()));
++i;
nano::confirm_req req (roots_hashes_l);
channel->send (req);
roots_hashes_l.clear ();
}
}
if (!roots_hashes_l.empty ())
{
nano::confirm_req req (roots_hashes_l);
channel->send (req);
}
else if (single_count++ < max_confirm_req)
{
node.network.broadcast_confirm_req (i->election->status.winner);
++i;
}
}
prepared = false;
}

View file

@ -1,5 +1,6 @@
#pragma once
#include <nano/node/network.hpp>
#include <nano/node/repcrawler.hpp>
#include <unordered_map>
@ -11,43 +12,28 @@ class node;
/** This class accepts elections that need further votes before they can be confirmed and bundles them in to single confirm_req packets */
class confirmation_solicitor final
{
class request
{
public:
std::shared_ptr<nano::transport::channel> channel;
std::shared_ptr<nano::election> election;
bool operator== (nano::confirmation_solicitor::request const & other_a) const
{
return *channel == *other_a.channel && election == other_a.election;
}
};
class request_hash
{
public:
size_t operator() (nano::confirmation_solicitor::request const & item_a) const
{
return std::hash<std::shared_ptr<nano::election>> () (item_a.election) ^ std::hash<nano::transport::channel> () (*item_a.channel);
}
};
public:
confirmation_solicitor (nano::node &);
confirmation_solicitor (nano::network &, nano::network_constants const &);
/** Prepare object for batching election confirmation requests*/
void prepare ();
/** Add an election that needs to be confirmed */
void add (std::shared_ptr<nano::election>);
/** Bundle hashes together for identical channels in to a single confirm_req by hash packet */
void prepare (std::vector<nano::representative> const &);
/** Broadcast the winner of an election if the broadcast limit has not been reached. Returns false if the broadcast was performed */
bool broadcast (nano::election const &);
/** Add an election that needs to be confirmed. Returns false if successfully added */
bool add (nano::election const &);
/** Dispatch bundled requests to each channel*/
void flush ();
/** The maximum amount of confirmation requests (batches) to be sent to each channel */
size_t const max_confirm_req_batches;
/** The global maximum amount of block broadcasts */
size_t const max_block_broadcasts;
private:
static size_t constexpr max_confirm_req_batches = 20;
static size_t constexpr max_confirm_req = 5;
static size_t constexpr max_block_broadcasts = 30;
nano::network & network;
int rebroadcasted{ 0 };
nano::node & node;
std::vector<nano::representative> representatives;
/** Unique channel/hash to be requested */
std::unordered_set<request, nano::confirmation_solicitor::request_hash> requests;
using vector_root_hashes = std::vector<std::pair<nano::block_hash, nano::root>>;
std::unordered_map<std::shared_ptr<nano::transport::channel>, vector_root_hashes> requests;
bool prepared{ false };
};
}

View file

@ -79,6 +79,8 @@ public:
bool stopped;
std::unordered_map<nano::block_hash, nano::uint128_t> last_tally;
unsigned confirmation_request_count{ 0 };
std::chrono::steady_clock::time_point last_broadcast;
std::chrono::steady_clock::time_point last_request;
std::unordered_set<nano::block_hash> dependent_blocks;
std::chrono::seconds late_blocks_delay{ 5 };
};

View file

@ -118,6 +118,7 @@ public:
bool disable_bootstrap_bulk_pull_server{ false };
bool disable_bootstrap_bulk_push_client{ false };
bool disable_rep_crawler{ false };
bool disable_request_loop{ false };
bool disable_tcp_realtime{ false };
bool disable_udp{ false };
bool disable_unchecked_cleanup{ false };

View file

@ -245,13 +245,14 @@ void nano::rep_crawler::update_weights ()
}
}
std::vector<nano::representative> nano::rep_crawler::representatives (size_t count_a)
std::vector<nano::representative> nano::rep_crawler::representatives (size_t count_a, boost::optional<decltype (nano::protocol_constants::protocol_version_min)> const & opt_version_min_a)
{
auto version_min (opt_version_min_a.value_or (node.network_params.protocol.protocol_version_min));
std::vector<representative> result;
nano::lock_guard<std::mutex> lock (probable_reps_mutex);
for (auto i (probable_reps.get<tag_weight> ().begin ()), n (probable_reps.get<tag_weight> ().end ()); i != n && result.size () < count_a; ++i)
{
if (!i->weight.is_zero ())
if (!i->weight.is_zero () && i->channel->get_network_version () >= version_min)
{
result.push_back (*i);
}

View file

@ -9,6 +9,7 @@
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/random_access_index.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/optional.hpp>
#include <chrono>
#include <memory>
@ -103,8 +104,8 @@ public:
/** Get total available weight from representatives */
nano::uint128_t total_weight () const;
/** Request a list of the top \p count_a known representatives in descending order of weight. */
std::vector<representative> representatives (size_t count_a = std::numeric_limits<size_t>::max ());
/** Request a list of the top \p count_a known representatives in descending order of weight, optionally with a minimum version \p opt_version_min_a */
std::vector<representative> representatives (size_t count_a = std::numeric_limits<size_t>::max (), boost::optional<decltype (nano::protocol_constants::protocol_version_min)> const & opt_version_min_a = boost::none);
/** Request a list of the top \p count_a known representative endpoints. */
std::vector<std::shared_ptr<nano::transport::channel>> representative_endpoints (size_t count_a);