Flood difficulty updates from RPC process (#2753)

* Flood difficulty updates from RPC process

This is currently done from the work watcher which doesn't go through ledger processing.

Now, there is a new `process_old` which floods the block if it was locally produced and there was a work update.

* (unrelated) assert no error on system.poll_until_true in a telemetry test
This commit is contained in:
Guilherme Lawless 2020-05-04 16:24:18 +01:00 committed by GitHub
commit 56a8ec8a21
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 113 additions and 25 deletions

View file

@ -644,9 +644,9 @@ TEST (node_telemetry, remove_peer_invalid_signature)
node->network.process_message (telemetry_ack, channel);
ASSERT_TIMELY (10s, node->stats.count (nano::stat::type::telemetry, nano::stat::detail::invalid_signature) > 0);
system.poll_until_true (3s, [&node, address = channel->get_endpoint ().address ()]() -> bool {
ASSERT_NO_ERROR (system.poll_until_true (3s, [&node, address = channel->get_endpoint ().address ()]() -> bool {
nano::lock_guard<std::mutex> guard (node->network.excluded_peers.mutex);
return node->network.excluded_peers.peers.get<nano::peer_exclusion::tag_endpoint> ().count (address);
});
}));
}
}

View file

@ -640,19 +640,16 @@ bool nano::active_transactions::update_difficulty (nano::block const & block_a)
{
nano::lock_guard<std::mutex> guard (mutex);
auto existing_election (roots.get<tag_root> ().find (block_a.qualified_root ()));
auto found = existing_election != roots.get<tag_root> ().end ();
if (found)
{
update_difficulty_impl (existing_election, block_a);
}
return !found;
bool error = existing_election == roots.get<tag_root> ().end () || update_difficulty_impl (existing_election, block_a);
return error;
}
void nano::active_transactions::update_difficulty_impl (nano::active_transactions::roots_iterator const & root_it_a, nano::block const & block_a)
bool nano::active_transactions::update_difficulty_impl (nano::active_transactions::roots_iterator const & root_it_a, nano::block const & block_a)
{
debug_assert (!mutex.try_lock ());
double multiplier (normalized_multiplier (block_a, root_it_a));
if (multiplier > root_it_a->multiplier)
bool error = multiplier <= root_it_a->multiplier;
if (!error)
{
if (node.config.logging.active_update_logging ())
{
@ -664,12 +661,14 @@ void nano::active_transactions::update_difficulty_impl (nano::active_transaction
add_adjust_difficulty (block_a.hash ());
node.stats.inc (nano::stat::type::election, nano::stat::detail::election_difficulty_update);
}
return error;
}
void nano::active_transactions::restart (std::shared_ptr<nano::block> const & block_a, nano::write_transaction const & transaction_a)
bool nano::active_transactions::restart (std::shared_ptr<nano::block> const & block_a, nano::write_transaction const & transaction_a)
{
// Only guaranteed to restart the election if the new block is received within 2 minutes of its election being dropped
constexpr std::chrono::minutes recently_dropped_cutoff{ 2 };
bool error = true;
if (recently_dropped.find (block_a->qualified_root ()) > std::chrono::steady_clock::now () - recently_dropped_cutoff)
{
auto hash (block_a->hash ());
@ -691,6 +690,7 @@ void nano::active_transactions::restart (std::shared_ptr<nano::block> const & bl
auto insert_result = insert (ledger_block, previous_balance);
if (insert_result.inserted)
{
error = false;
insert_result.election->transition_active ();
recently_dropped.erase (ledger_block->qualified_root ());
node.stats.inc (nano::stat::type::election, nano::stat::detail::election_restart);
@ -698,6 +698,7 @@ void nano::active_transactions::restart (std::shared_ptr<nano::block> const & bl
}
}
}
return error;
}
double nano::active_transactions::normalized_multiplier (nano::block const & block_a, boost::optional<nano::active_transactions::roots_iterator> const & root_it_a) const

View file

@ -145,9 +145,10 @@ public:
bool active (nano::block const &);
bool active (nano::qualified_root const &);
std::shared_ptr<nano::election> election (nano::qualified_root const &) const;
// Returns true if this block was not active
// Returns false if the election difficulty was updated
bool update_difficulty (nano::block const &);
void restart (std::shared_ptr<nano::block> const &, nano::write_transaction const &);
// Returns false if the election was restarted
bool restart (std::shared_ptr<nano::block> const &, nano::write_transaction const &);
double normalized_multiplier (nano::block const &, boost::optional<roots_iterator> const & = boost::none) const;
void add_adjust_difficulty (nano::block_hash const &);
void update_adjusted_multiplier ();
@ -197,7 +198,8 @@ private:
// clang-format off
nano::election_insertion_result insert_impl (std::shared_ptr<nano::block> const &, boost::optional<nano::uint128_t> const & = boost::none, std::function<void(std::shared_ptr<nano::block>)> const & = [](std::shared_ptr<nano::block>) {});
// clang-format on
void update_difficulty_impl (roots_iterator const &, nano::block const &);
// Returns false if the election difficulty was updated
bool update_difficulty_impl (roots_iterator const &, nano::block const &);
void request_loop ();
void confirm_prioritized_frontiers (nano::transaction const & transaction_a);
void request_confirm (nano::unique_lock<std::mutex> &);

View file

@ -265,7 +265,7 @@ void nano::block_processor::process_batch (nano::unique_lock<std::mutex> & lock_
}
}
void nano::block_processor::process_live (nano::block_hash const & hash_a, std::shared_ptr<nano::block> block_a, nano::process_return const & process_return_a, const bool watch_work_a, const bool initial_publish_a)
void nano::block_processor::process_live (nano::block_hash const & hash_a, std::shared_ptr<nano::block> block_a, nano::process_return const & process_return_a, const bool watch_work_a, nano::block_origin const origin_a)
{
// Add to work watcher to prevent dropping the election
if (watch_work_a)
@ -281,7 +281,7 @@ void nano::block_processor::process_live (nano::block_hash const & hash_a, std::
}
// Announce block contents to the network
if (initial_publish_a)
if (origin_a == nano::block_origin::local)
{
node.network.flood_block_initial (block_a);
}
@ -296,7 +296,7 @@ void nano::block_processor::process_live (nano::block_hash const & hash_a, std::
}
}
nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, nano::unchecked_info info_a, const bool watch_work_a, const bool first_publish_a)
nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, nano::unchecked_info info_a, const bool watch_work_a, nano::block_origin const origin_a)
{
nano::process_return result;
auto hash (info_a.block->hash ());
@ -314,7 +314,7 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction
}
if (info_a.modified > nano::seconds_since_epoch () - 300 && node.block_arrival.recent (hash))
{
process_live (hash, info_a.block, result, watch_work_a, first_publish_a);
process_live (hash, info_a.block, result, watch_work_a, origin_a);
}
queue_unchecked (transaction_a, hash);
break;
@ -374,10 +374,7 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction
node.logger.try_log (boost::str (boost::format ("Old for: %1%") % hash.to_string ()));
}
queue_unchecked (transaction_a, hash);
if (node.active.update_difficulty (*info_a.block))
{
node.active.restart (info_a.block, transaction_a);
}
process_old (transaction_a, info_a.block, origin_a);
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::old);
break;
}
@ -464,6 +461,19 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction
return result;
}
void nano::block_processor::process_old (nano::write_transaction const & transaction_a, std::shared_ptr<nano::block> const & block_a, nano::block_origin const origin_a)
{
// First try to update election difficulty, then attempt to restart an election
if (!node.active.update_difficulty (*block_a) || !node.active.restart (block_a, transaction_a))
{
// Let others know about the difficulty update
if (origin_a == nano::block_origin::local)
{
node.network.flood_block_initial (block_a);
}
}
}
void nano::block_processor::queue_unchecked (nano::write_transaction const & transaction_a, nano::block_hash const & hash_a)
{
auto unchecked_blocks (node.store.unchecked_get (transaction_a, hash_a));

View file

@ -21,6 +21,12 @@ class transaction;
class write_transaction;
class write_database_queue;
enum class block_origin
{
local,
remote
};
/**
* Processing blocks is a potentially long IO operation.
* This class isolates block insertion from other operations like servicing network operations
@ -42,7 +48,7 @@ public:
bool should_log ();
bool have_blocks ();
void process_blocks ();
nano::process_return process_one (nano::write_transaction const &, nano::unchecked_info, const bool = false, const bool = false);
nano::process_return process_one (nano::write_transaction const &, nano::unchecked_info, const bool = false, nano::block_origin const = nano::block_origin::remote);
nano::process_return process_one (nano::write_transaction const &, std::shared_ptr<nano::block>, const bool = false);
std::atomic<bool> flushing{ false };
// Delay required for average network propagartion before requesting confirmation
@ -51,7 +57,8 @@ public:
private:
void queue_unchecked (nano::write_transaction const &, nano::block_hash const &);
void process_batch (nano::unique_lock<std::mutex> &);
void process_live (nano::block_hash const &, std::shared_ptr<nano::block>, nano::process_return const &, const bool = false, const bool = false);
void process_live (nano::block_hash const &, std::shared_ptr<nano::block>, nano::process_return const &, const bool = false, nano::block_origin const = nano::block_origin::remote);
void process_old (nano::write_transaction const &, std::shared_ptr<nano::block> const &, nano::block_origin const);
void requeue_invalid (nano::block_hash const &, nano::unchecked_info const &);
void process_verified_state_blocks (std::deque<nano::unchecked_info> &, std::vector<int> const &, std::vector<nano::block_hash> const &, std::vector<nano::signature> const &);
bool stopped{ false };

View file

@ -612,7 +612,7 @@ nano::process_return nano::node::process_local (std::shared_ptr<nano::block> blo
block_processor.wait_write ();
// Process block
auto transaction (store.tx_begin_write ({ tables::accounts, tables::cached_counts, tables::change_blocks, tables::frontiers, tables::open_blocks, tables::pending, tables::receive_blocks, tables::representation, tables::send_blocks, tables::state_blocks }, { tables::confirmation_height }));
return block_processor.process_one (transaction, info, work_watcher_a, true);
return block_processor.process_one (transaction, info, work_watcher_a, nano::block_origin::local);
}
void nano::node::start ()

View file

@ -2225,6 +2225,74 @@ TEST (rpc, process_ledger_insufficient_work)
ASSERT_EQ (response.json.get<std::string> ("error"), ec.message ());
}
// Ensure that processing an old block with updated work floods it to peers
TEST (rpc, process_difficulty_update_flood)
{
nano::system system (1);
auto & node_passive = *system.nodes[0];
auto & node = *add_ipc_enabled_node (system);
auto latest (node.latest (nano::test_genesis_key.pub));
nano::state_block send (nano::genesis_account, latest, nano::genesis_account, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node.work_generate_blocking (latest));
scoped_io_thread_name_change scoped_thread_name_io;
nano::node_rpc_config node_rpc_config;
nano::ipc::ipc_server ipc_server (node, node_rpc_config);
nano::rpc_config rpc_config (nano::get_available_port (), true);
rpc_config.rpc_process.ipc_port = node.config.ipc_config.transport_tcp.port;
nano::ipc_rpc_processor ipc_rpc_processor (system.io_ctx, rpc_config);
nano::rpc rpc (system.io_ctx, rpc_config, ipc_rpc_processor);
rpc.start ();
boost::property_tree::ptree request;
request.put ("action", "process");
// Must not watch work, otherwise the work watcher could update the block and flood it, whereas we want to ensure flooding happens on demand, without the work watcher
request.put ("watch_work", false);
{
std::string json;
send.serialize_json (json);
request.put ("block", json);
test_response response (request, rpc.config.port, system.io_ctx);
ASSERT_TIMELY (5s, response.status != 0);
ASSERT_EQ (200, response.status);
ASSERT_EQ (0, response.json.count ("error"));
}
ASSERT_TIMELY (5s, node_passive.active.size () == 1 && node_passive.block (send.hash ()) != nullptr);
// Update block work
node.work_generate_blocking (send, send.difficulty ());
auto expected_multiplier = nano::normalized_multiplier (nano::difficulty::to_multiplier (send.difficulty (), nano::work_threshold (send.work_version (), nano::block_details (nano::epoch::epoch_0, true, false, false))), node.network_params.network.publish_thresholds.epoch_1);
{
std::string json;
send.serialize_json (json);
request.put ("block", json);
std::error_code ec (nano::error_process::old);
test_response response (request, rpc.config.port, system.io_ctx);
ASSERT_TIMELY (5s, response.status != 0);
ASSERT_EQ (200, response.status);
ASSERT_EQ (response.json.get<std::string> ("error"), ec.message ());
}
// Ensure the difficulty update occurs in both nodes
ASSERT_NO_ERROR (system.poll_until_true (5s, [&node, &node_passive, &send, expected_multiplier] {
nano::lock_guard<std::mutex> guard (node.active.mutex);
auto const existing (node.active.roots.find (send.qualified_root ()));
EXPECT_NE (existing, node.active.roots.end ());
nano::lock_guard<std::mutex> guard_passive (node_passive.active.mutex);
auto const existing_passive (node_passive.active.roots.find (send.qualified_root ()));
EXPECT_NE (existing_passive, node_passive.active.roots.end ());
bool updated = existing->multiplier == expected_multiplier;
bool updated_passive = existing_passive->multiplier == expected_multiplier;
return updated && updated_passive;
}));
}
TEST (rpc, keepalive)
{
nano::system system;