Support multiple work peers in the same host (#2477)
* Correct check for peers when creating work * Support multiple work peers in the same address * Send cancels despite errors to account for non-conforming implementations * Add tests using a fake work peer, acting as good, malicious or slow * Comment
This commit is contained in:
parent
9557ff90f0
commit
eca1ec8aeb
6 changed files with 425 additions and 59 deletions
|
@ -1,6 +1,7 @@
|
|||
add_executable (core_test
|
||||
core_test_main.cc
|
||||
testutil.hpp
|
||||
fakes/work_peer.hpp
|
||||
active_transactions.cpp
|
||||
block.cpp
|
||||
block_store.cpp
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
#include <nano/core_test/fakes/work_peer.hpp>
|
||||
#include <nano/core_test/testutil.hpp>
|
||||
#include <nano/node/testing.hpp>
|
||||
|
||||
|
@ -148,3 +149,122 @@ TEST (distributed_work, no_peers_multi)
|
|||
}
|
||||
count = 0;
|
||||
}
|
||||
|
||||
TEST (distributed_work, peer)
|
||||
{
|
||||
nano::system system;
|
||||
nano::node_config node_config;
|
||||
node_config.peering_port = nano::get_available_port ();
|
||||
// Disable local work generation
|
||||
node_config.work_threads = 0;
|
||||
auto node (system.add_node (node_config));
|
||||
ASSERT_FALSE (node->local_work_generation_enabled ());
|
||||
nano::block_hash hash{ 1 };
|
||||
boost::optional<uint64_t> work;
|
||||
std::atomic<bool> done{ false };
|
||||
auto callback = [&work, &done](boost::optional<uint64_t> work_a) {
|
||||
ASSERT_TRUE (work_a.is_initialized ());
|
||||
work = work_a;
|
||||
done = true;
|
||||
};
|
||||
auto work_peer (std::make_shared<fake_work_peer> (node->work, node->io_ctx, nano::get_available_port (), work_peer_type::good));
|
||||
work_peer->start ();
|
||||
decltype (node->config.work_peers) peers;
|
||||
peers.emplace_back ("localhost", work_peer->port ());
|
||||
ASSERT_FALSE (node->distributed_work.make (hash, peers, callback, node->network_params.network.publish_threshold, nano::account ()));
|
||||
system.deadline_set (5s);
|
||||
while (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
ASSERT_FALSE (nano::work_validate (hash, *work));
|
||||
ASSERT_EQ (1, work_peer->generations_good);
|
||||
ASSERT_EQ (0, work_peer->generations_bad);
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
ASSERT_EQ (0, work_peer->cancels);
|
||||
}
|
||||
|
||||
TEST (distributed_work, peer_malicious)
|
||||
{
|
||||
nano::system system (1);
|
||||
auto node (system.nodes[0]);
|
||||
ASSERT_TRUE (node->local_work_generation_enabled ());
|
||||
nano::block_hash hash{ 1 };
|
||||
boost::optional<uint64_t> work;
|
||||
std::atomic<bool> done{ false };
|
||||
auto callback = [&work, &done](boost::optional<uint64_t> work_a) {
|
||||
ASSERT_TRUE (work_a.is_initialized ());
|
||||
work = work_a;
|
||||
done = true;
|
||||
};
|
||||
auto malicious_peer (std::make_shared<fake_work_peer> (node->work, node->io_ctx, nano::get_available_port (), work_peer_type::malicious));
|
||||
malicious_peer->start ();
|
||||
decltype (node->config.work_peers) peers;
|
||||
peers.emplace_back ("localhost", malicious_peer->port ());
|
||||
ASSERT_FALSE (node->distributed_work.make (hash, peers, callback, node->network_params.network.publish_threshold, nano::account ()));
|
||||
system.deadline_set (5s);
|
||||
while (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
ASSERT_FALSE (nano::work_validate (hash, *work));
|
||||
system.deadline_set (3s);
|
||||
while (malicious_peer->generations_bad < 2)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
// make sure it was *not* the malicious peer that replied
|
||||
ASSERT_EQ (0, malicious_peer->generations_good);
|
||||
// initial generation + the second time when it also starts doing local generation
|
||||
ASSERT_EQ (2, malicious_peer->generations_bad);
|
||||
// this peer should not receive a cancel
|
||||
ASSERT_EQ (0, malicious_peer->cancels);
|
||||
}
|
||||
|
||||
TEST (distributed_work, peer_multi)
|
||||
{
|
||||
nano::system system (1);
|
||||
auto node (system.nodes[0]);
|
||||
ASSERT_TRUE (node->local_work_generation_enabled ());
|
||||
nano::block_hash hash{ 1 };
|
||||
boost::optional<uint64_t> work;
|
||||
std::atomic<bool> done{ false };
|
||||
auto callback = [&work, &done](boost::optional<uint64_t> work_a) {
|
||||
ASSERT_TRUE (work_a.is_initialized ());
|
||||
work = work_a;
|
||||
done = true;
|
||||
};
|
||||
auto good_peer (std::make_shared<fake_work_peer> (node->work, node->io_ctx, nano::get_available_port (), work_peer_type::good));
|
||||
auto malicious_peer (std::make_shared<fake_work_peer> (node->work, node->io_ctx, nano::get_available_port (), work_peer_type::malicious));
|
||||
auto slow_peer (std::make_shared<fake_work_peer> (node->work, node->io_ctx, nano::get_available_port (), work_peer_type::slow));
|
||||
good_peer->start ();
|
||||
malicious_peer->start ();
|
||||
slow_peer->start ();
|
||||
decltype (node->config.work_peers) peers;
|
||||
peers.emplace_back ("localhost", malicious_peer->port ());
|
||||
peers.emplace_back ("localhost", slow_peer->port ());
|
||||
peers.emplace_back ("localhost", good_peer->port ());
|
||||
ASSERT_FALSE (node->distributed_work.make (hash, peers, callback, node->network_params.network.publish_threshold, nano::account ()));
|
||||
system.deadline_set (5s);
|
||||
while (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
ASSERT_FALSE (nano::work_validate (hash, *work));
|
||||
system.deadline_set (3s);
|
||||
while (slow_peer->cancels < 1)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
ASSERT_EQ (0, malicious_peer->generations_good);
|
||||
ASSERT_EQ (1, malicious_peer->generations_bad);
|
||||
ASSERT_EQ (0, malicious_peer->cancels);
|
||||
|
||||
ASSERT_EQ (0, slow_peer->generations_good);
|
||||
ASSERT_EQ (0, slow_peer->generations_bad);
|
||||
ASSERT_EQ (1, slow_peer->cancels);
|
||||
|
||||
ASSERT_EQ (1, good_peer->generations_good);
|
||||
ASSERT_EQ (0, good_peer->generations_bad);
|
||||
ASSERT_EQ (0, good_peer->cancels);
|
||||
}
|
||||
|
|
252
nano/core_test/fakes/work_peer.hpp
Normal file
252
nano/core_test/fakes/work_peer.hpp
Normal file
|
@ -0,0 +1,252 @@
|
|||
#pragma once
|
||||
|
||||
#include <nano/lib/errors.hpp>
|
||||
#include <nano/lib/locks.hpp>
|
||||
#include <nano/lib/numbers.hpp>
|
||||
#include <nano/lib/work.hpp>
|
||||
#include <nano/node/common.hpp>
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/beast/core.hpp>
|
||||
#include <boost/beast/http.hpp>
|
||||
#include <boost/property_tree/json_parser.hpp>
|
||||
#include <boost/property_tree/ptree.hpp>
|
||||
|
||||
#include <unordered_set>
|
||||
|
||||
namespace beast = boost::beast;
|
||||
namespace http = beast::http;
|
||||
namespace ptree = boost::property_tree;
|
||||
namespace asio = boost::asio;
|
||||
using tcp = boost::asio::ip::tcp;
|
||||
|
||||
namespace
|
||||
{
|
||||
enum class work_peer_type
|
||||
{
|
||||
good,
|
||||
malicious,
|
||||
slow
|
||||
};
|
||||
|
||||
class work_peer_connection : public std::enable_shared_from_this<work_peer_connection>
|
||||
{
|
||||
const std::string generic_error = "Unable to parse JSON";
|
||||
const std::string empty_response = "Empty response";
|
||||
|
||||
public:
|
||||
work_peer_connection (asio::io_context & ioc_a, work_peer_type const type_a, nano::work_pool & pool_a, std::function<void(bool const)> on_generation_a, std::function<void()> on_cancel_a) :
|
||||
socket (ioc_a),
|
||||
type (type_a),
|
||||
work_pool (pool_a),
|
||||
on_generation (on_generation_a),
|
||||
on_cancel (on_cancel_a),
|
||||
timer (ioc_a)
|
||||
{
|
||||
}
|
||||
void start ()
|
||||
{
|
||||
read_request ();
|
||||
}
|
||||
tcp::socket socket;
|
||||
|
||||
private:
|
||||
work_peer_type type;
|
||||
nano::work_pool & work_pool;
|
||||
beast::flat_buffer buffer{ 8192 };
|
||||
http::request<http::string_body> request;
|
||||
http::response<http::dynamic_body> response;
|
||||
std::function<void(bool const)> on_generation;
|
||||
std::function<void()> on_cancel;
|
||||
asio::deadline_timer timer;
|
||||
|
||||
void read_request ()
|
||||
{
|
||||
auto this_l = shared_from_this ();
|
||||
http::async_read (socket, buffer, request, [this_l](beast::error_code ec, std::size_t const /*size_a*/) {
|
||||
if (!ec)
|
||||
{
|
||||
this_l->process_request ();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void process_request ()
|
||||
{
|
||||
switch (request.method ())
|
||||
{
|
||||
case http::verb::post:
|
||||
response.result (http::status::ok);
|
||||
create_response ();
|
||||
break;
|
||||
|
||||
default:
|
||||
response.result (http::status::bad_request);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void create_response ()
|
||||
{
|
||||
std::stringstream istream (request.body ());
|
||||
try
|
||||
{
|
||||
ptree::ptree result;
|
||||
ptree::read_json (istream, result);
|
||||
handle (result);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
error (generic_error);
|
||||
write_response ();
|
||||
}
|
||||
response.version (request.version ());
|
||||
response.keep_alive (false);
|
||||
}
|
||||
|
||||
void write_response ()
|
||||
{
|
||||
auto this_l = shared_from_this ();
|
||||
response.set (http::field::content_length, response.body ().size ());
|
||||
http::async_write (socket, response, [this_l](beast::error_code ec, std::size_t /*size_a*/) {
|
||||
this_l->socket.shutdown (tcp::socket::shutdown_send, ec);
|
||||
this_l->socket.close ();
|
||||
});
|
||||
}
|
||||
|
||||
void error (std::string const & message_a)
|
||||
{
|
||||
ptree::ptree error_l;
|
||||
error_l.put ("error", message_a);
|
||||
std::stringstream ostream;
|
||||
ptree::write_json (ostream, error_l);
|
||||
beast::ostream (response.body ()) << ostream.str ();
|
||||
}
|
||||
|
||||
void handle_generate (nano::block_hash const & hash_a)
|
||||
{
|
||||
if (type == work_peer_type::good)
|
||||
{
|
||||
auto hash = hash_a;
|
||||
auto this_l (shared_from_this ());
|
||||
work_pool.generate (hash, [this_l, hash](boost::optional<uint64_t> work_a) {
|
||||
auto result = work_a.value_or (0);
|
||||
uint64_t difficulty;
|
||||
nano::work_validate (hash, result, &difficulty);
|
||||
static nano::network_params params;
|
||||
ptree::ptree message_l;
|
||||
message_l.put ("work", nano::to_string_hex (result));
|
||||
message_l.put ("difficulty", nano::to_string_hex (difficulty));
|
||||
message_l.put ("multiplier", nano::to_string (nano::difficulty::to_multiplier (difficulty, params.network.publish_threshold)));
|
||||
message_l.put ("hash", hash.to_string ());
|
||||
std::stringstream ostream;
|
||||
ptree::write_json (ostream, message_l);
|
||||
beast::ostream (this_l->response.body ()) << ostream.str ();
|
||||
// Delay response by 500ms as a slow peer, immediate async call for a good peer
|
||||
this_l->timer.expires_from_now (boost::posix_time::milliseconds (this_l->type == work_peer_type::slow ? 500 : 0));
|
||||
this_l->timer.async_wait ([this_l, result](const boost::system::error_code & ec) {
|
||||
if (this_l->on_generation)
|
||||
{
|
||||
this_l->on_generation (result != 0);
|
||||
}
|
||||
this_l->write_response ();
|
||||
});
|
||||
});
|
||||
}
|
||||
else if (type == work_peer_type::malicious)
|
||||
{
|
||||
// Respond immediately with no work
|
||||
on_generation (false);
|
||||
write_response ();
|
||||
}
|
||||
}
|
||||
|
||||
void handle (ptree::ptree const & tree_a)
|
||||
{
|
||||
auto action_text (tree_a.get<std::string> ("action"));
|
||||
auto hash_text (tree_a.get<std::string> ("hash"));
|
||||
nano::block_hash hash;
|
||||
hash.decode_hex (hash_text);
|
||||
if (action_text == "work_generate")
|
||||
{
|
||||
handle_generate (hash);
|
||||
}
|
||||
else if (action_text == "work_cancel")
|
||||
{
|
||||
error (empty_response);
|
||||
on_cancel ();
|
||||
write_response ();
|
||||
}
|
||||
else
|
||||
{
|
||||
throw;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
class fake_work_peer : public std::enable_shared_from_this<fake_work_peer>
|
||||
{
|
||||
public:
|
||||
fake_work_peer (nano::work_pool & pool_a, asio::io_context & ioc_a, unsigned short port_a, work_peer_type const type_a) :
|
||||
pool (pool_a),
|
||||
endpoint (tcp::v4 (), port_a),
|
||||
ioc (ioc_a),
|
||||
acceptor (ioc_a, endpoint),
|
||||
type (type_a)
|
||||
{
|
||||
}
|
||||
void start ()
|
||||
{
|
||||
listen ();
|
||||
}
|
||||
unsigned short port () const
|
||||
{
|
||||
return endpoint.port ();
|
||||
}
|
||||
std::atomic<size_t> generations_good{ 0 };
|
||||
std::atomic<size_t> generations_bad{ 0 };
|
||||
std::atomic<size_t> cancels{ 0 };
|
||||
|
||||
private:
|
||||
void listen ()
|
||||
{
|
||||
std::weak_ptr<fake_work_peer> this_w (shared_from_this ());
|
||||
auto connection (std::make_shared<work_peer_connection> (ioc, type, pool,
|
||||
[this_w](bool const good_generation) {
|
||||
if (auto this_l = this_w.lock ())
|
||||
{
|
||||
if (good_generation)
|
||||
{
|
||||
++this_l->generations_good;
|
||||
}
|
||||
else
|
||||
{
|
||||
++this_l->generations_bad;
|
||||
}
|
||||
};
|
||||
},
|
||||
[this_w]() {
|
||||
if (auto this_l = this_w.lock ())
|
||||
{
|
||||
++this_l->cancels;
|
||||
}
|
||||
}));
|
||||
acceptor.async_accept (connection->socket, [connection, this_w](beast::error_code ec) {
|
||||
if (!ec)
|
||||
{
|
||||
if (auto this_l = this_w.lock ())
|
||||
{
|
||||
connection->start ();
|
||||
this_l->listen ();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
nano::work_pool & pool;
|
||||
tcp::endpoint endpoint;
|
||||
asio::io_context & ioc;
|
||||
tcp::acceptor acceptor;
|
||||
work_peer_type const type;
|
||||
};
|
||||
}
|
|
@ -11,7 +11,7 @@ std::shared_ptr<request_type> nano::distributed_work::peer_request::get_prepared
|
|||
auto request (std::make_shared<request_type> ());
|
||||
request->method (boost::beast::http::verb::post);
|
||||
request->set (boost::beast::http::field::content_type, "application/json");
|
||||
auto address_string = boost::algorithm::erase_first_copy (address.to_string (), "::ffff:");
|
||||
auto address_string = boost::algorithm::erase_first_copy (endpoint.address ().to_string (), "::ffff:");
|
||||
request->set (boost::beast::http::field::host, address_string);
|
||||
request->target ("/");
|
||||
request->version (11);
|
||||
|
@ -69,7 +69,7 @@ void nano::distributed_work::start ()
|
|||
auto parsed_address (boost::asio::ip::make_address_v6 (current.first, ec));
|
||||
if (!ec)
|
||||
{
|
||||
outstanding[parsed_address] = current.second;
|
||||
outstanding.emplace_back (parsed_address, current.second);
|
||||
start ();
|
||||
}
|
||||
else
|
||||
|
@ -80,7 +80,7 @@ void nano::distributed_work::start ()
|
|||
for (auto i (i_a), n (boost::asio::ip::udp::resolver::iterator{}); i != n; ++i)
|
||||
{
|
||||
auto endpoint (i->endpoint ());
|
||||
this_l->outstanding[endpoint.address ()] = endpoint.port ();
|
||||
this_l->outstanding.emplace_back (endpoint.address (), endpoint.port ());
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -100,13 +100,11 @@ void nano::distributed_work::start_work ()
|
|||
if (!outstanding.empty ())
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (mutex);
|
||||
for (auto const & i : outstanding)
|
||||
for (auto const & endpoint : outstanding)
|
||||
{
|
||||
auto host (i.first);
|
||||
auto service (i.second);
|
||||
auto connection (std::make_shared<peer_request> (this_l->node.io_ctx, host, service));
|
||||
auto connection (std::make_shared<peer_request> (this_l->node.io_ctx, endpoint));
|
||||
connections.emplace_back (connection);
|
||||
connection->socket.async_connect (nano::tcp_endpoint (host, service),
|
||||
connection->socket.async_connect (connection->endpoint,
|
||||
boost::asio::bind_executor (strand,
|
||||
[this_l, connection](boost::system::error_code const & ec) {
|
||||
if (!ec && !this_l->stopped)
|
||||
|
@ -137,42 +135,35 @@ void nano::distributed_work::start_work ()
|
|||
{
|
||||
if (connection->response.result () == boost::beast::http::status::ok)
|
||||
{
|
||||
this_l->success (connection->response.body (), connection->address, connection->port);
|
||||
this_l->success (connection->response.body (), connection->endpoint);
|
||||
}
|
||||
else if (ec)
|
||||
{
|
||||
this_l->node.logger.try_log (boost::str (boost::format ("Work peer responded with an error %1% %2%: %3%") % connection->address % connection->port % connection->response.result ()));
|
||||
this_l->add_bad_peer (connection->address, connection->port);
|
||||
this_l->failure (connection->address);
|
||||
this_l->node.logger.try_log (boost::str (boost::format ("Work peer responded with an error %1% %2%: %3%") % connection->endpoint.address () % connection->endpoint.port () % connection->response.result ()));
|
||||
this_l->add_bad_peer (connection->endpoint);
|
||||
this_l->failure (connection->endpoint);
|
||||
}
|
||||
}
|
||||
else if (ec == boost::system::errc::operation_canceled)
|
||||
{
|
||||
// The only case where we send a cancel is if we preempt stopped waiting for the response
|
||||
this_l->cancel (*connection);
|
||||
this_l->failure (connection->address);
|
||||
}
|
||||
else if (ec)
|
||||
{
|
||||
this_l->node.logger.try_log (boost::str (boost::format ("Unable to read from work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
|
||||
this_l->add_bad_peer (connection->address, connection->port);
|
||||
this_l->failure (connection->address);
|
||||
this_l->cancel (*connection);
|
||||
this_l->failure (connection->endpoint);
|
||||
}
|
||||
}));
|
||||
}
|
||||
else if (ec && ec != boost::system::errc::operation_canceled)
|
||||
{
|
||||
this_l->node.logger.try_log (boost::str (boost::format ("Unable to write to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
|
||||
this_l->add_bad_peer (connection->address, connection->port);
|
||||
this_l->failure (connection->address);
|
||||
this_l->node.logger.try_log (boost::str (boost::format ("Unable to write to work_peer %1% %2%: %3% (%4%)") % connection->endpoint.address () % connection->endpoint.port () % ec.message () % ec.value ()));
|
||||
this_l->add_bad_peer (connection->endpoint);
|
||||
this_l->failure (connection->endpoint);
|
||||
}
|
||||
}));
|
||||
}
|
||||
else if (ec && ec != boost::system::errc::operation_canceled)
|
||||
{
|
||||
this_l->node.logger.try_log (boost::str (boost::format ("Unable to connect to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
|
||||
this_l->add_bad_peer (connection->address, connection->port);
|
||||
this_l->failure (connection->address);
|
||||
this_l->node.logger.try_log (boost::str (boost::format ("Unable to connect to work_peer %1% %2%: %3% (%4%)") % connection->endpoint.address () % connection->endpoint.port () % ec.message () % ec.value ()));
|
||||
this_l->add_bad_peer (connection->endpoint);
|
||||
this_l->failure (connection->endpoint);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
@ -209,8 +200,8 @@ void nano::distributed_work::start_work ()
|
|||
void nano::distributed_work::cancel (peer_request const & connection_a)
|
||||
{
|
||||
auto this_l (shared_from_this ());
|
||||
auto cancelling_l (std::make_shared<peer_request> (node.io_ctx, connection_a.address, connection_a.port));
|
||||
cancelling_l->socket.async_connect (nano::tcp_endpoint (cancelling_l->address, cancelling_l->port),
|
||||
auto cancelling_l (std::make_shared<peer_request> (node.io_ctx, connection_a.endpoint));
|
||||
cancelling_l->socket.async_connect (cancelling_l->endpoint,
|
||||
boost::asio::bind_executor (strand,
|
||||
[this_l, cancelling_l](boost::system::error_code const & ec) {
|
||||
if (!ec)
|
||||
|
@ -230,16 +221,16 @@ void nano::distributed_work::cancel (peer_request const & connection_a)
|
|||
[this_l, peer_cancel, cancelling_l](boost::system::error_code const & ec, size_t bytes_transferred) {
|
||||
if (ec && ec != boost::system::errc::operation_canceled)
|
||||
{
|
||||
this_l->node.logger.try_log (boost::str (boost::format ("Unable to send work_cancel to work_peer %1% %2%: %3% (%4%)") % cancelling_l->address % cancelling_l->port % ec.message () % ec.value ()));
|
||||
this_l->node.logger.try_log (boost::str (boost::format ("Unable to send work_cancel to work_peer %1% %2%: %3% (%4%)") % cancelling_l->endpoint.address () % cancelling_l->endpoint.port () % ec.message () % ec.value ()));
|
||||
}
|
||||
}));
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
void nano::distributed_work::success (std::string const & body_a, boost::asio::ip::address const & address_a, uint16_t port_a)
|
||||
void nano::distributed_work::success (std::string const & body_a, nano::tcp_endpoint const & endpoint_a)
|
||||
{
|
||||
auto last (remove (address_a));
|
||||
auto last (remove (endpoint_a));
|
||||
std::stringstream istream (body_a);
|
||||
try
|
||||
{
|
||||
|
@ -253,27 +244,27 @@ void nano::distributed_work::success (std::string const & body_a, boost::asio::i
|
|||
if (!nano::work_validate (request.root, work, &result_difficulty) && result_difficulty >= request.difficulty)
|
||||
{
|
||||
node.unresponsive_work_peers = false;
|
||||
set_once (work, boost::str (boost::format ("%1%:%2%") % address_a % port_a));
|
||||
set_once (work, boost::str (boost::format ("%1%:%2%") % endpoint_a.address () % endpoint_a.port ()));
|
||||
stop_once (true);
|
||||
}
|
||||
else
|
||||
{
|
||||
node.logger.try_log (boost::str (boost::format ("Incorrect work response from %1%:%2% for root %3% with diffuculty %4%: %5%") % address_a % port_a % request.root.to_string () % nano::to_string_hex (request.difficulty) % work_text));
|
||||
add_bad_peer (address_a, port_a);
|
||||
node.logger.try_log (boost::str (boost::format ("Incorrect work response from %1%:%2% for root %3% with diffuculty %4%: %5%") % endpoint_a.address () % endpoint_a.port () % request.root.to_string () % nano::to_string_hex (request.difficulty) % work_text));
|
||||
add_bad_peer (endpoint_a);
|
||||
handle_failure (last);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
node.logger.try_log (boost::str (boost::format ("Work response from %1%:%2% wasn't a number: %3%") % address_a % port_a % work_text));
|
||||
add_bad_peer (address_a, port_a);
|
||||
node.logger.try_log (boost::str (boost::format ("Work response from %1%:%2% wasn't a number: %3%") % endpoint_a.address () % endpoint_a.port () % work_text));
|
||||
add_bad_peer (endpoint_a);
|
||||
handle_failure (last);
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
node.logger.try_log (boost::str (boost::format ("Work response from %1%:%2% wasn't parsable: %3%") % address_a % port_a % body_a));
|
||||
add_bad_peer (address_a, port_a);
|
||||
node.logger.try_log (boost::str (boost::format ("Work response from %1%:%2% wasn't parsable: %3%") % endpoint_a.address () % endpoint_a.port () % body_a));
|
||||
add_bad_peer (endpoint_a);
|
||||
handle_failure (last);
|
||||
}
|
||||
}
|
||||
|
@ -302,12 +293,12 @@ void nano::distributed_work::stop_once (bool const local_stop_a)
|
|||
connection_l->socket.close (ec);
|
||||
if (ec)
|
||||
{
|
||||
this_l->node.logger.try_log (boost::str (boost::format ("Error closing socket with work_peer %1% %2%: %3%") % connection_l->address % connection_l->port % ec.message () % ec.value ()));
|
||||
this_l->node.logger.try_log (boost::str (boost::format ("Error closing socket with work_peer %1% %2%: %3%") % connection_l->endpoint.address () % connection_l->endpoint.port () % ec.message () % ec.value ()));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
this_l->node.logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection_l->address % connection_l->port % ec.message () % ec.value ()));
|
||||
this_l->node.logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection_l->endpoint.address () % connection_l->endpoint.port () % ec.message () % ec.value ()));
|
||||
}
|
||||
}
|
||||
}));
|
||||
|
@ -357,9 +348,9 @@ void nano::distributed_work::cancel ()
|
|||
}
|
||||
}
|
||||
|
||||
void nano::distributed_work::failure (boost::asio::ip::address const & address_a)
|
||||
void nano::distributed_work::failure (nano::tcp_endpoint const & endpoint_a)
|
||||
{
|
||||
auto last (remove (address_a));
|
||||
auto last (remove (endpoint_a));
|
||||
handle_failure (last);
|
||||
}
|
||||
|
||||
|
@ -399,15 +390,19 @@ void nano::distributed_work::handle_failure (bool const last_a)
|
|||
}
|
||||
}
|
||||
|
||||
bool nano::distributed_work::remove (boost::asio::ip::address const & address_a)
|
||||
bool nano::distributed_work::remove (nano::tcp_endpoint const & endpoint_a)
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (mutex);
|
||||
outstanding.erase (address_a);
|
||||
auto existing (std::find (outstanding.begin (), outstanding.end (), endpoint_a));
|
||||
if (existing != outstanding.end ())
|
||||
{
|
||||
outstanding.erase (existing);
|
||||
}
|
||||
return outstanding.empty ();
|
||||
}
|
||||
|
||||
void nano::distributed_work::add_bad_peer (boost::asio::ip::address const & address_a, uint16_t port_a)
|
||||
void nano::distributed_work::add_bad_peer (nano::tcp_endpoint const & endpoint_a)
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (mutex);
|
||||
bad_peers.emplace_back (boost::str (boost::format ("%1%:%2%") % address_a % port_a));
|
||||
bad_peers.emplace_back (boost::str (boost::format ("%1%:%2%") % endpoint_a.address () % endpoint_a.port ()));
|
||||
}
|
||||
|
|
|
@ -6,10 +6,10 @@
|
|||
#include <nano/boost/beast/http/string_body.hpp>
|
||||
#include <nano/lib/numbers.hpp>
|
||||
#include <nano/lib/timer.hpp>
|
||||
#include <nano/node/common.hpp>
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
|
||||
using request_type = boost::beast::http::request<boost::beast::http::string_body>;
|
||||
|
@ -52,15 +52,13 @@ class distributed_work final : public std::enable_shared_from_this<nano::distrib
|
|||
class peer_request final
|
||||
{
|
||||
public:
|
||||
peer_request (boost::asio::io_context & io_ctx_a, boost::asio::ip::address address_a, uint16_t port_a) :
|
||||
address (address_a),
|
||||
port (port_a),
|
||||
peer_request (boost::asio::io_context & io_ctx_a, nano::tcp_endpoint const & endpoint_a) :
|
||||
endpoint (endpoint_a),
|
||||
socket (io_ctx_a)
|
||||
{
|
||||
}
|
||||
std::shared_ptr<request_type> get_prepared_json_request (std::string const &) const;
|
||||
boost::asio::ip::address address;
|
||||
uint16_t port;
|
||||
nano::tcp_endpoint const endpoint;
|
||||
boost::beast::flat_buffer buffer;
|
||||
boost::beast::http::response<boost::beast::http::string_body> response;
|
||||
boost::asio::ip::tcp::socket socket;
|
||||
|
@ -77,14 +75,14 @@ private:
|
|||
/** Cancellation is done with an entirely new connection, \p request_a is only used to copy its address and port */
|
||||
void cancel (peer_request const & request_a);
|
||||
/** Called on a successful peer response, validates the reply */
|
||||
void success (std::string const &, boost::asio::ip::address const &, uint16_t const);
|
||||
void success (std::string const &, nano::tcp_endpoint const &);
|
||||
/** Send a work_cancel message to all remaining connections */
|
||||
void stop_once (bool const);
|
||||
void set_once (uint64_t const, std::string const & source_a = "local");
|
||||
void failure (boost::asio::ip::address const &);
|
||||
void failure (nano::tcp_endpoint const &);
|
||||
void handle_failure (bool const);
|
||||
bool remove (boost::asio::ip::address const &);
|
||||
void add_bad_peer (boost::asio::ip::address const &, uint16_t const);
|
||||
bool remove (nano::tcp_endpoint const &);
|
||||
void add_bad_peer (nano::tcp_endpoint const &);
|
||||
|
||||
nano::node & node;
|
||||
nano::work_request request;
|
||||
|
@ -92,7 +90,7 @@ private:
|
|||
std::chrono::seconds backoff;
|
||||
boost::asio::strand<boost::asio::io_context::executor_type> strand;
|
||||
std::vector<std::pair<std::string, uint16_t>> need_resolve;
|
||||
std::map<boost::asio::ip::address, uint16_t> outstanding;
|
||||
std::vector<nano::tcp_endpoint> outstanding;
|
||||
std::vector<std::weak_ptr<peer_request>> connections;
|
||||
|
||||
work_generation_status status{ work_generation_status::ongoing };
|
||||
|
|
|
@ -22,7 +22,7 @@ bool nano::distributed_work_factory::make (std::chrono::seconds const & backoff_
|
|||
if (!stopped)
|
||||
{
|
||||
cleanup_finished ();
|
||||
if (node.work_generation_enabled ())
|
||||
if (node.work_generation_enabled (request_a.peers))
|
||||
{
|
||||
auto distributed (std::make_shared<nano::distributed_work> (node, request_a, backoff_a));
|
||||
{
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue