Adding concept of recent block arrival which tracks block that have arrived from UDP packets.

This commit is contained in:
clemahieu 2017-09-02 19:18:23 -05:00
commit c334c8497e
2 changed files with 119 additions and 64 deletions

View file

@ -358,6 +358,7 @@ public:
++node.network.incoming.publish;
node.peers.contacted (sender, message_a.version_using);
node.peers.insert (sender, message_a.version_using);
node.block_arrival.add (message_a.block->hash ());
node.process_receive_republish (message_a.block);
}
void confirm_req (rai::confirm_req const & message_a) override
@ -369,6 +370,7 @@ public:
++node.network.incoming.confirm_req;
node.peers.contacted (sender, message_a.version_using);
node.peers.insert (sender, message_a.version_using);
node.block_arrival.add (message_a.block->hash ());
node.process_receive_republish (message_a.block);
rai::transaction transaction_a (node.store.environment, nullptr, false);
if (node.store.block_exists (transaction_a, message_a.block->hash ()))
@ -385,6 +387,7 @@ public:
++node.network.incoming.confirm_ack;
node.peers.contacted (sender, message_a.version_using);
node.peers.insert (sender, message_a.version_using);
node.block_arrival.add (message_a.vote.block->hash ());
node.process_receive_republish (message_a.vote.block);
node.vote_processor.vote (message_a.vote, sender);
}
@ -1113,7 +1116,8 @@ void rai::block_processor::process_receive_many (std::shared_ptr <rai::block> bl
std::vector <std::shared_ptr <rai::block>> blocks;
blocks.push_back (block_a);
while (!blocks.empty ())
{
{
std::deque <std::pair <std::shared_ptr <rai::block>, rai::process_return>> progress;
{
rai::transaction transaction (node.store.environment, nullptr, true);
auto count (0);
@ -1127,6 +1131,9 @@ void rai::block_processor::process_receive_many (std::shared_ptr <rai::block> bl
switch (process_result.code)
{
case rai::process_result::progress:
{
progress.push_back (std::make_pair (block, process_result));
}
case rai::process_result::old:
{
auto cached (node.store.unchecked_get (transaction, hash));
@ -1145,6 +1152,10 @@ void rai::block_processor::process_receive_many (std::shared_ptr <rai::block> bl
++count;
}
}
for (auto & i : progress)
{
node.observers.blocks (i.first, i.second.account, i.second.amount);
}
}
}
@ -1307,99 +1318,105 @@ block_processor (*this)
};
observers.blocks.add ([this] (std::shared_ptr <rai::block> block_a, rai::account const & account_a, rai::amount const & amount_a)
{
if (!config.callback_address.empty ())
if (block_arrival.recent (block_a->hash ()))
{
boost::property_tree::ptree event;
event.add ("account", account_a.to_account ());
event.add ("hash", block_a->hash ().to_string ());
std::string block_text;
block_a->serialize_json (block_text);
event.add ("block", block_text);
event.add ("amount", amount_a.to_string_dec ());
std::stringstream ostream;
boost::property_tree::write_json (ostream, event);
ostream.flush ();
auto body (std::make_shared <std::string> (ostream.str ()));
auto address (config.callback_address);
auto port (config.callback_port);
auto target (std::make_shared <std::string> (config.callback_target));
auto node_l (shared_from_this ());
auto resolver (std::make_shared <boost::asio::ip::tcp::resolver> (service));
resolver->async_resolve (boost::asio::ip::tcp::resolver::query (address, std::to_string (port)), [node_l, address, port, target, body, resolver] (boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator i_a)
background ([node_l, block_a, account_a, amount_a] ()
{
if (!ec)
if (!node_l->config.callback_address.empty ())
{
for (auto i (i_a), n (boost::asio::ip::tcp::resolver::iterator {}); i != n; ++i)
boost::property_tree::ptree event;
event.add ("account", account_a.to_account ());
event.add ("hash", block_a->hash ().to_string ());
std::string block_text;
block_a->serialize_json (block_text);
event.add ("block", block_text);
event.add ("amount", amount_a.to_string_dec ());
std::stringstream ostream;
boost::property_tree::write_json (ostream, event);
ostream.flush ();
auto body (std::make_shared <std::string> (ostream.str ()));
auto address (node_l->config.callback_address);
auto port (node_l->config.callback_port);
auto target (std::make_shared <std::string> (node_l->config.callback_target));
auto resolver (std::make_shared <boost::asio::ip::tcp::resolver> (node_l->service));
resolver->async_resolve (boost::asio::ip::tcp::resolver::query (address, std::to_string (port)), [node_l, address, port, target, body, resolver] (boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator i_a)
{
auto sock (std::make_shared <boost::asio::ip::tcp::socket> (node_l->service));
sock->async_connect (i->endpoint(), [node_l, target, body, sock, address, port] (boost::system::error_code const & ec)
if (!ec)
{
if (!ec)
for (auto i (i_a), n (boost::asio::ip::tcp::resolver::iterator {}); i != n; ++i)
{
auto req (std::make_shared <boost::beast::http::request<boost::beast::http::string_body>> ());
req->method (boost::beast::http::verb::post);
req->target (*target);
req->version = 11;
req->insert(boost::beast::http::field::host, address);
req->body = *body;
//req->prepare (*req);
//boost::beast::http::prepare(req);
req->prepare_payload();
boost::beast::http::async_write (*sock, *req, [node_l, sock, address, port, req] (boost::system::error_code const & ec)
auto sock (std::make_shared <boost::asio::ip::tcp::socket> (node_l->service));
sock->async_connect (i->endpoint(), [node_l, target, body, sock, address, port] (boost::system::error_code const & ec)
{
if (!ec)
{
auto sb (std::make_shared <boost::beast::flat_buffer> ());
auto resp (std::make_shared <boost::beast::http::response <boost::beast::http::string_body>> ());
boost::beast::http::async_read (*sock, *sb, *resp, [node_l, sb, resp, sock, address, port] (boost::system::error_code const & ec)
auto req (std::make_shared <boost::beast::http::request<boost::beast::http::string_body>> ());
req->method (boost::beast::http::verb::post);
req->target (*target);
req->version = 11;
req->insert(boost::beast::http::field::host, address);
req->body = *body;
//req->prepare (*req);
//boost::beast::http::prepare(req);
req->prepare_payload();
boost::beast::http::async_write (*sock, *req, [node_l, sock, address, port, req] (boost::system::error_code const & ec)
{
if (!ec)
{
if (resp->result() == boost::beast::http::status::ok)
auto sb (std::make_shared <boost::beast::flat_buffer> ());
auto resp (std::make_shared <boost::beast::http::response <boost::beast::http::string_body>> ());
boost::beast::http::async_read (*sock, *sb, *resp, [node_l, sb, resp, sock, address, port] (boost::system::error_code const & ec)
{
}
else
{
if (node_l->config.logging.callback_logging ())
if (!ec)
{
BOOST_LOG (node_l->log) << boost::str (boost::format ("Callback to %1%:%2% failed with status: %3%") % address % port % resp->result());
if (resp->result() == boost::beast::http::status::ok)
{
}
else
{
if (node_l->config.logging.callback_logging ())
{
BOOST_LOG (node_l->log) << boost::str (boost::format ("Callback to %1%:%2% failed with status: %3%") % address % port % resp->result());
}
}
}
}
else
{
if (node_l->config.logging.callback_logging ())
{
BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable complete callback: %1%:%2% %3%") % address % port % ec.message ());
}
};
});
}
else
{
if (node_l->config.logging.callback_logging ())
{
BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable complete callback: %1%:%2% %3%") % address % port % ec.message ());
BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable to send callback: %1%:%2% %3%") % address % port % ec.message ());
}
};
}
});
}
else
{
if (node_l->config.logging.callback_logging ())
{
BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable to send callback: %1%:%2% %3%") % address % port % ec.message ());
BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable to connect to callback address: %1%:%2%, %3%") % address % port % ec.message ());
}
}
});
}
else
}
else
{
if (node_l->config.logging.callback_logging ())
{
if (node_l->config.logging.callback_logging ())
{
BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable to connect to callback address: %1%:%2%, %3%") % address % port % ec.message ());
}
BOOST_LOG (node_l->log) << boost::str (boost::format ("Error resolving callback: %1%:%2%, %3%") % address % port % ec.message ());
}
});
}
}
else
{
if (node_l->config.logging.callback_logging ())
{
BOOST_LOG (node_l->log) << boost::str (boost::format ("Error resolving callback: %1%:%2%, %3%") % address % port % ec.message ());
}
}
});
}
});
}
@ -1582,10 +1599,6 @@ void rai::node::process_receive_republish (std::shared_ptr <rai::block> incoming
case rai::process_result::progress:
{
node_l->active.start (transaction_a, block_a);
node_l->background ([node_l, block_a, result_a] ()
{
node_l->observers.blocks (block_a, result_a.account, result_a.amount);
});
break;
}
default:
@ -2240,6 +2253,23 @@ rai::endpoint rai::network::endpoint ()
return rai::endpoint (boost::asio::ip::address_v6::loopback (), port);
}
void rai::block_arrival::add (rai::block_hash const & hash_a)
{
std::lock_guard <std::mutex> lock (mutex);
auto now (std::chrono::system_clock::now ());
arrival.insert (rai::block_arrival_info {now, hash_a});
while (!arrival.empty () && arrival.begin ()->arrival + std::chrono::seconds (60) < now)
{
arrival.erase (arrival.begin ());
}
}
bool rai::block_arrival::recent (rai::block_hash const & hash_a)
{
std::lock_guard <std::mutex> lock (mutex);
return arrival.get <1> ().find (hash_a) != arrival.get <1> ().end ();
}
std::unordered_set <rai::endpoint> rai::peer_container::random_set (size_t count_a)
{
std::unordered_set <rai::endpoint> result;

View file

@ -267,6 +267,30 @@ public:
std::atomic <uint64_t> confirm_req;
std::atomic <uint64_t> confirm_ack;
};
class block_arrival_info
{
public:
std::chrono::system_clock::time_point arrival;
rai::block_hash hash;
};
// This class tracks blocks that are probably live because they arrived in a UDP packet
// This gives a fairly reliable way to differentiate between blocks being inserted via bootstrap or new, live blocks.
class block_arrival
{
public:
void add (rai::block_hash const &);
bool recent (rai::block_hash const &);
boost::multi_index_container
<
rai::block_arrival_info,
boost::multi_index::indexed_by
<
boost::multi_index::ordered_non_unique <boost::multi_index::member <rai::block_arrival_info, std::chrono::system_clock::time_point, &rai::block_arrival_info::arrival>>,
boost::multi_index::hashed_unique <boost::multi_index::member <rai::block_arrival_info, rai::block_hash, &rai::block_arrival_info::hash>>
>
> arrival;
std::mutex mutex;
};
class network
{
public:
@ -487,6 +511,7 @@ public:
rai::rep_crawler rep_crawler;
unsigned warmed_up;
rai::block_processor block_processor;
rai::block_arrival block_arrival;
static double constexpr price_max = 16.0;
static double constexpr free_cutoff = 1024.0;
static std::chrono::seconds constexpr period = std::chrono::seconds (60);