From 3d5c2d28d50376c656e7d537aecd821f5a231bfd Mon Sep 17 00:00:00 2001 From: Russel Waters Date: Sun, 11 Aug 2019 13:07:51 -0400 Subject: [PATCH] RPC publish optional work_watch flag to add to work_watcher (#2168) * publishes from rpc send, publish and wallet send, publish will no longer be eligible to drop due to bandwidth limiting * PR feedback edits * formatting * add flag watch_work, defaulting to true to process RPC call * normalize bool name to fit other usages * add watch_work setting to false to require enable_control * default watch_work is true, should check for false --- nano/node/blockprocessor.cpp | 17 +++++---- nano/node/blockprocessor.hpp | 6 ++-- nano/node/bootstrap.cpp | 8 ++--- nano/node/election.cpp | 2 +- nano/node/json_handler.cpp | 3 +- nano/node/network.cpp | 4 +-- nano/node/network.hpp | 7 ++-- nano/node/node.cpp | 4 +-- nano/node/node.hpp | 2 +- nano/node/transport/transport.cpp | 4 +-- nano/node/transport/transport.hpp | 2 +- nano/node/wallet.cpp | 4 +-- nano/rpc/rpc_handler.cpp | 3 +- nano/rpc_test/rpc.cpp | 60 +++++++++++++++++++++++++++++++ 14 files changed, 97 insertions(+), 29 deletions(-) diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 0a506963..f4f4b747 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -354,12 +354,17 @@ void nano::block_processor::process_batch (std::unique_lock & lock_a } } -void nano::block_processor::process_live (nano::block_hash const & hash_a, std::shared_ptr block_a) +void nano::block_processor::process_live (nano::block_hash const & hash_a, std::shared_ptr block_a, const bool watch_work_a) { // Start collecting quorum on block node.active.start (block_a); + //add block to watcher if desired after block has been added to active + if (watch_work_a) + { + node.wallets.watcher.add (block_a); + } // Announce block contents to the network - node.network.flood_block (block_a); + node.network.flood_block (block_a, false); if (node.config.enable_voting) { // Announce our weighted vote to the network @@ -389,7 +394,7 @@ void nano::block_processor::process_live (nano::block_hash const & hash_a, std:: }); } -nano::process_return nano::block_processor::process_one (nano::transaction const & transaction_a, nano::unchecked_info info_a) +nano::process_return nano::block_processor::process_one (nano::transaction const & transaction_a, nano::unchecked_info info_a, const bool watch_work_a) { nano::process_return result; auto hash (info_a.block->hash ()); @@ -407,7 +412,7 @@ nano::process_return nano::block_processor::process_one (nano::transaction const } if (info_a.modified > nano::seconds_since_epoch () - 300 && node.block_arrival.recent (hash)) { - process_live (hash, info_a.block); + process_live (hash, info_a.block, watch_work_a); } queue_unchecked (transaction_a, hash); break; @@ -522,10 +527,10 @@ nano::process_return nano::block_processor::process_one (nano::transaction const return result; } -nano::process_return nano::block_processor::process_one (nano::transaction const & transaction_a, std::shared_ptr block_a) +nano::process_return nano::block_processor::process_one (nano::transaction const & transaction_a, std::shared_ptr block_a, const bool watch_work_a) { nano::unchecked_info info (block_a, block_a->account (), 0, nano::signature_verification::unknown); - auto result (process_one (transaction_a, info)); + auto result (process_one (transaction_a, info, watch_work_a)); return result; } diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 5b29d319..12cd3c13 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -45,8 +45,8 @@ public: bool should_log (bool); bool have_blocks (); void process_blocks (); - nano::process_return process_one (nano::transaction const &, nano::unchecked_info); - nano::process_return process_one (nano::transaction const &, std::shared_ptr); + nano::process_return process_one (nano::transaction const &, nano::unchecked_info, const bool = false); + nano::process_return process_one (nano::transaction const &, std::shared_ptr, const bool = false); nano::vote_generator generator; // Delay required for average network propagartion before requesting confirmation static std::chrono::milliseconds constexpr confirmation_request_delay{ 1500 }; @@ -55,7 +55,7 @@ private: void queue_unchecked (nano::transaction const &, nano::block_hash const &); void verify_state_blocks (nano::transaction const & transaction_a, std::unique_lock &, size_t = std::numeric_limits::max ()); void process_batch (std::unique_lock &); - void process_live (nano::block_hash const &, std::shared_ptr); + void process_live (nano::block_hash const &, std::shared_ptr, const bool = false); bool stopped; bool active; bool awaiting_write{ false }; diff --git a/nano/node/bootstrap.cpp b/nano/node/bootstrap.cpp index 4e4929e6..e7cf77c7 100644 --- a/nano/node/bootstrap.cpp +++ b/nano/node/bootstrap.cpp @@ -81,7 +81,7 @@ void nano::frontier_req_client::run () } } }, - false); // is bootstrap traffic is_dropable false + false); // is bootstrap traffic is_droppable false } std::shared_ptr nano::bootstrap_client::shared () @@ -352,7 +352,7 @@ void nano::bulk_pull_client::request () this_l->connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_request_failure, nano::stat::dir::in); } }, - false); // is bootstrap traffic is_dropable false + false); // is bootstrap traffic is_droppable false } void nano::bulk_pull_client::receive_block () @@ -539,7 +539,7 @@ void nano::bulk_push_client::start () } } }, - false); // is bootstrap traffic is_dropable false + false); // is bootstrap traffic is_droppable false } void nano::bulk_push_client::push (nano::transaction const & transaction_a) @@ -678,7 +678,7 @@ void nano::bulk_pull_account_client::request () this_l->connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_error_starting_request, nano::stat::dir::in); } }, - false); // is bootstrap traffic is_dropable false + false); // is bootstrap traffic is_droppable false } void nano::bulk_pull_account_client::receive_pending () diff --git a/nano/node/election.cpp b/nano/node/election.cpp index 4bf4dfd5..827cb910 100644 --- a/nano/node/election.cpp +++ b/nano/node/election.cpp @@ -223,7 +223,7 @@ bool nano::election::publish (std::shared_ptr block_a) { blocks.insert (std::make_pair (block_a->hash (), block_a)); confirm_if_quorum (transaction); - node.network.flood_block (block_a); + node.network.flood_block (block_a, false); } else { diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 207ed266..403c7723 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -2854,6 +2854,7 @@ void nano::json_handler::payment_wait () void nano::json_handler::process () { const bool json_block_l = request.get ("json_block", false); + const bool watch_work_l = request.get ("watch_work", true); std::shared_ptr block; if (json_block_l) { @@ -2935,7 +2936,7 @@ void nano::json_handler::process () { if (!nano::work_validate (*block)) { - auto result (node.process_local (block)); + auto result (node.process_local (block, watch_work_l)); switch (result.code) { case nano::process_result::progress: diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 8687af61..f78445c6 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -215,12 +215,12 @@ bool nano::network::send_votes_cache (std::shared_ptr return result; } -void nano::network::flood_message (nano::message const & message_a) +void nano::network::flood_message (nano::message const & message_a, bool const is_droppable_a) { auto list (list_fanout ()); for (auto i (list.begin ()), n (list.end ()); i != n; ++i) { - (*i)->send (message_a); + (*i)->send (message_a, nullptr, is_droppable_a); } } diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 6ada45cb..c60ebdbe 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -114,7 +114,7 @@ public: ~network (); void start (); void stop (); - void flood_message (nano::message const &); + void flood_message (nano::message const &, bool const = true); void flood_keepalive () { nano::keepalive message; @@ -126,11 +126,12 @@ public: nano::confirm_ack message (vote_a); flood_message (message); } - void flood_block (std::shared_ptr block_a) + void flood_block (std::shared_ptr block_a, bool const is_droppable_a = true) { nano::publish publish (block_a); - flood_message (publish); + flood_message (publish, is_droppable_a); } + void flood_block_batch (std::deque>, unsigned = broadcast_interval_ms); void merge_peers (std::array const &); void merge_peer (nano::endpoint const &); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 8061508f..ec85ffb6 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -608,7 +608,7 @@ nano::process_return nano::node::process (nano::block const & block_a) return result; } -nano::process_return nano::node::process_local (std::shared_ptr block_a) +nano::process_return nano::node::process_local (std::shared_ptr block_a, bool const work_watcher_a) { // Add block hash as recently arrived to trigger automatic rebroadcast and election block_arrival.add (block_a->hash ()); @@ -618,7 +618,7 @@ nano::process_return nano::node::process_local (std::shared_ptr blo block_processor.wait_write (); // Process block auto transaction (store.tx_begin_write ()); - return block_processor.process_one (transaction, info); + return block_processor.process_one (transaction, info, work_watcher_a); } void nano::node::start () diff --git a/nano/node/node.hpp b/nano/node/node.hpp index ec081290..0d1ad77b 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -105,7 +105,7 @@ public: void process_confirmed (nano::election_status const &, uint8_t = 0); void process_active (std::shared_ptr); nano::process_return process (nano::block const &); - nano::process_return process_local (std::shared_ptr); + nano::process_return process_local (std::shared_ptr, bool const = false); void keepalive_preconfigured (std::vector const &); nano::block_hash latest (nano::account const &); nano::uint128_t balance (nano::account const &); diff --git a/nano/node/transport/transport.cpp b/nano/node/transport/transport.cpp index 16466a37..548e4217 100644 --- a/nano/node/transport/transport.cpp +++ b/nano/node/transport/transport.cpp @@ -75,13 +75,13 @@ node (node_a) { } -void nano::transport::channel::send (nano::message const & message_a, std::function const & callback_a, bool const & is_dropable) +void nano::transport::channel::send (nano::message const & message_a, std::function const & callback_a, bool const is_droppable_a) { callback_visitor visitor; message_a.visit (visitor); auto buffer (message_a.to_bytes ()); auto detail (visitor.result); - if (!is_dropable || !limiter.should_drop (buffer->size ())) + if (!is_droppable_a || !limiter.should_drop (buffer->size ())) { send_buffer (buffer, detail, callback_a); node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out); diff --git a/nano/node/transport/transport.hpp b/nano/node/transport/transport.hpp index 2fda8ea2..af8f2c29 100644 --- a/nano/node/transport/transport.hpp +++ b/nano/node/transport/transport.hpp @@ -53,7 +53,7 @@ namespace transport virtual ~channel () = default; virtual size_t hash_code () const = 0; virtual bool operator== (nano::transport::channel const &) const = 0; - void send (nano::message const &, std::function const & = nullptr, bool const & = true); + void send (nano::message const &, std::function const & = nullptr, bool const = true); virtual void send_buffer (std::shared_ptr>, nano::stat::detail, std::function const & = nullptr) = 0; virtual std::function callback (std::shared_ptr>, nano::stat::detail, std::function const & = nullptr) const = 0; virtual std::string to_string () const = 0; diff --git a/nano/node/wallet.cpp b/nano/node/wallet.cpp index 280d3423..139d8400 100644 --- a/nano/node/wallet.cpp +++ b/nano/node/wallet.cpp @@ -1069,7 +1069,7 @@ std::shared_ptr nano::wallet::send_action (nano::account const & so if (block != nullptr) { cached_block = true; - wallets.node.network.flood_block (block); + wallets.node.network.flood_block (block, false); } } else if (status != MDB_NOTFOUND) @@ -1500,7 +1500,7 @@ void nano::work_watcher::run () current->second = block; } } - node.network.flood_block (block); + node.network.flood_block (block, false); node.active.update_difficulty (*block.get ()); lock.lock (); if (stopped) diff --git a/nano/rpc/rpc_handler.cpp b/nano/rpc/rpc_handler.cpp index 298ba1b2..f990c9b8 100644 --- a/nano/rpc/rpc_handler.cpp +++ b/nano/rpc/rpc_handler.cpp @@ -88,7 +88,8 @@ void nano::rpc_handler::process_request () else if (action == "process") { auto force = request.get_optional ("force"); - if (force.is_initialized () && *force && !rpc_config.enable_control) + auto watch_work = request.get_optional ("watch_work"); + if (((force.is_initialized () && *force) || (watch_work.is_initialized () && !*watch_work)) && !rpc_config.enable_control) { json_error_response (response, rpc_control_disabled_ec.message ()); error = true; diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 7ccf4f65..139f6fd0 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -1581,6 +1581,66 @@ TEST (rpc, process_block) ASSERT_EQ (send.hash ().to_string (), send_hash); } +TEST (rpc, process_block_with_work_watcher) +{ + nano::system system; + nano::node_config node_config (24000, system.logging); + node_config.enable_voting = false; + auto & node1 = *system.add_node (node_config); + nano::keypair key; + auto latest (system.nodes[0]->latest (nano::test_genesis_key.pub)); + auto send (std::make_shared (nano::test_genesis_key.pub, latest, nano::test_genesis_key.pub, nano::genesis_amount - 100, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest))); + uint64_t difficulty1 (0); + nano::work_validate (*send, &difficulty1); + auto multiplier1 = nano::difficulty::to_multiplier (difficulty1, node1.network_params.network.publish_threshold); + enable_ipc_transport_tcp (node1.config.ipc_config.transport_tcp); + nano::node_rpc_config node_rpc_config; + nano::ipc::ipc_server ipc_server (node1, node_rpc_config); + nano::rpc_config rpc_config (true); + nano::ipc_rpc_processor ipc_rpc_processor (system.io_ctx, rpc_config); + nano::rpc rpc (system.io_ctx, rpc_config, ipc_rpc_processor); + rpc.start (); + boost::property_tree::ptree request; + request.put ("action", "process"); + request.put ("work_watcher", true); + std::string json; + send->serialize_json (json); + request.put ("block", json); + test_response response (request, rpc.config.port, system.io_ctx); + system.deadline_set (5s); + while (response.status == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (200, response.status); + system.deadline_set (10s); + while (system.nodes[0]->latest (nano::test_genesis_key.pub) != send->hash ()) + { + ASSERT_NO_ERROR (system.poll ()); + } + system.deadline_set (10s); + auto updated (false); + uint64_t updated_difficulty; + while (!updated) + { + std::unique_lock lock (node1.active.mutex); + //fill multipliers_cb and update active difficulty; + for (auto i (0); i < node1.active.multipliers_cb.size (); i++) + { + node1.active.multipliers_cb.push_back (multiplier1 * (1 + i / 100.)); + } + node1.active.update_active_difficulty (lock); + auto const existing (node1.active.roots.find (send->qualified_root ())); + //if existing is junk the block has been confirmed already + ASSERT_NE (existing, node1.active.roots.end ()); + updated = existing->difficulty != difficulty1; + updated_difficulty = existing->difficulty; + lock.unlock (); + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_GT (updated_difficulty, difficulty1); +} + TEST (rpc, process_block_no_work) { nano::system system (24000, 1);