diff --git a/rai/node/node.cpp b/rai/node/node.cpp index 767e4690..58906b98 100755 --- a/rai/node/node.cpp +++ b/rai/node/node.cpp @@ -358,6 +358,7 @@ public: ++node.network.incoming.publish; node.peers.contacted (sender, message_a.version_using); node.peers.insert (sender, message_a.version_using); + node.block_arrival.add (message_a.block->hash ()); node.process_receive_republish (message_a.block); } void confirm_req (rai::confirm_req const & message_a) override @@ -369,6 +370,7 @@ public: ++node.network.incoming.confirm_req; node.peers.contacted (sender, message_a.version_using); node.peers.insert (sender, message_a.version_using); + node.block_arrival.add (message_a.block->hash ()); node.process_receive_republish (message_a.block); rai::transaction transaction_a (node.store.environment, nullptr, false); if (node.store.block_exists (transaction_a, message_a.block->hash ())) @@ -385,6 +387,7 @@ public: ++node.network.incoming.confirm_ack; node.peers.contacted (sender, message_a.version_using); node.peers.insert (sender, message_a.version_using); + node.block_arrival.add (message_a.vote.block->hash ()); node.process_receive_republish (message_a.vote.block); node.vote_processor.vote (message_a.vote, sender); } @@ -1113,7 +1116,8 @@ void rai::block_processor::process_receive_many (std::shared_ptr bl std::vector > blocks; blocks.push_back (block_a); while (!blocks.empty ()) - { + { + std::deque , rai::process_return>> progress; { rai::transaction transaction (node.store.environment, nullptr, true); auto count (0); @@ -1127,6 +1131,9 @@ void rai::block_processor::process_receive_many (std::shared_ptr bl switch (process_result.code) { case rai::process_result::progress: + { + progress.push_back (std::make_pair (block, process_result)); + } case rai::process_result::old: { auto cached (node.store.unchecked_get (transaction, hash)); @@ -1145,6 +1152,10 @@ void rai::block_processor::process_receive_many (std::shared_ptr bl ++count; } } + for (auto & i : progress) + { + node.observers.blocks (i.first, i.second.account, i.second.amount); + } } } @@ -1307,99 +1318,105 @@ block_processor (*this) }; observers.blocks.add ([this] (std::shared_ptr block_a, rai::account const & account_a, rai::amount const & amount_a) { - if (!config.callback_address.empty ()) + if (block_arrival.recent (block_a->hash ())) { - boost::property_tree::ptree event; - event.add ("account", account_a.to_account ()); - event.add ("hash", block_a->hash ().to_string ()); - std::string block_text; - block_a->serialize_json (block_text); - event.add ("block", block_text); - event.add ("amount", amount_a.to_string_dec ()); - std::stringstream ostream; - boost::property_tree::write_json (ostream, event); - ostream.flush (); - auto body (std::make_shared (ostream.str ())); - auto address (config.callback_address); - auto port (config.callback_port); - auto target (std::make_shared (config.callback_target)); auto node_l (shared_from_this ()); - auto resolver (std::make_shared (service)); - resolver->async_resolve (boost::asio::ip::tcp::resolver::query (address, std::to_string (port)), [node_l, address, port, target, body, resolver] (boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator i_a) + background ([node_l, block_a, account_a, amount_a] () { - if (!ec) + if (!node_l->config.callback_address.empty ()) { - for (auto i (i_a), n (boost::asio::ip::tcp::resolver::iterator {}); i != n; ++i) + boost::property_tree::ptree event; + event.add ("account", account_a.to_account ()); + event.add ("hash", block_a->hash ().to_string ()); + std::string block_text; + block_a->serialize_json (block_text); + event.add ("block", block_text); + event.add ("amount", amount_a.to_string_dec ()); + std::stringstream ostream; + boost::property_tree::write_json (ostream, event); + ostream.flush (); + auto body (std::make_shared (ostream.str ())); + auto address (node_l->config.callback_address); + auto port (node_l->config.callback_port); + auto target (std::make_shared (node_l->config.callback_target)); + auto resolver (std::make_shared (node_l->service)); + resolver->async_resolve (boost::asio::ip::tcp::resolver::query (address, std::to_string (port)), [node_l, address, port, target, body, resolver] (boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator i_a) { - auto sock (std::make_shared (node_l->service)); - sock->async_connect (i->endpoint(), [node_l, target, body, sock, address, port] (boost::system::error_code const & ec) + if (!ec) { - if (!ec) + for (auto i (i_a), n (boost::asio::ip::tcp::resolver::iterator {}); i != n; ++i) { - auto req (std::make_shared > ()); - req->method (boost::beast::http::verb::post); - req->target (*target); - req->version = 11; - req->insert(boost::beast::http::field::host, address); - req->body = *body; - //req->prepare (*req); - //boost::beast::http::prepare(req); - req->prepare_payload(); - boost::beast::http::async_write (*sock, *req, [node_l, sock, address, port, req] (boost::system::error_code const & ec) + auto sock (std::make_shared (node_l->service)); + sock->async_connect (i->endpoint(), [node_l, target, body, sock, address, port] (boost::system::error_code const & ec) { if (!ec) { - auto sb (std::make_shared ()); - auto resp (std::make_shared > ()); - boost::beast::http::async_read (*sock, *sb, *resp, [node_l, sb, resp, sock, address, port] (boost::system::error_code const & ec) + auto req (std::make_shared > ()); + req->method (boost::beast::http::verb::post); + req->target (*target); + req->version = 11; + req->insert(boost::beast::http::field::host, address); + req->body = *body; + //req->prepare (*req); + //boost::beast::http::prepare(req); + req->prepare_payload(); + boost::beast::http::async_write (*sock, *req, [node_l, sock, address, port, req] (boost::system::error_code const & ec) { if (!ec) { - if (resp->result() == boost::beast::http::status::ok) + auto sb (std::make_shared ()); + auto resp (std::make_shared > ()); + boost::beast::http::async_read (*sock, *sb, *resp, [node_l, sb, resp, sock, address, port] (boost::system::error_code const & ec) { - } - else - { - if (node_l->config.logging.callback_logging ()) + if (!ec) { - BOOST_LOG (node_l->log) << boost::str (boost::format ("Callback to %1%:%2% failed with status: %3%") % address % port % resp->result()); + if (resp->result() == boost::beast::http::status::ok) + { + } + else + { + if (node_l->config.logging.callback_logging ()) + { + BOOST_LOG (node_l->log) << boost::str (boost::format ("Callback to %1%:%2% failed with status: %3%") % address % port % resp->result()); + } + } } - } + else + { + if (node_l->config.logging.callback_logging ()) + { + BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable complete callback: %1%:%2% %3%") % address % port % ec.message ()); + } + }; + }); } else { if (node_l->config.logging.callback_logging ()) { - BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable complete callback: %1%:%2% %3%") % address % port % ec.message ()); + BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable to send callback: %1%:%2% %3%") % address % port % ec.message ()); } - }; + } }); } else { if (node_l->config.logging.callback_logging ()) { - BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable to send callback: %1%:%2% %3%") % address % port % ec.message ()); + BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable to connect to callback address: %1%:%2%, %3%") % address % port % ec.message ()); } } }); } - else + } + else + { + if (node_l->config.logging.callback_logging ()) { - if (node_l->config.logging.callback_logging ()) - { - BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable to connect to callback address: %1%:%2%, %3%") % address % port % ec.message ()); - } + BOOST_LOG (node_l->log) << boost::str (boost::format ("Error resolving callback: %1%:%2%, %3%") % address % port % ec.message ()); } - }); - } - } - else - { - if (node_l->config.logging.callback_logging ()) - { - BOOST_LOG (node_l->log) << boost::str (boost::format ("Error resolving callback: %1%:%2%, %3%") % address % port % ec.message ()); - } + } + }); } }); } @@ -1582,10 +1599,6 @@ void rai::node::process_receive_republish (std::shared_ptr incoming case rai::process_result::progress: { node_l->active.start (transaction_a, block_a); - node_l->background ([node_l, block_a, result_a] () - { - node_l->observers.blocks (block_a, result_a.account, result_a.amount); - }); break; } default: @@ -2240,6 +2253,23 @@ rai::endpoint rai::network::endpoint () return rai::endpoint (boost::asio::ip::address_v6::loopback (), port); } +void rai::block_arrival::add (rai::block_hash const & hash_a) +{ + std::lock_guard lock (mutex); + auto now (std::chrono::system_clock::now ()); + arrival.insert (rai::block_arrival_info {now, hash_a}); + while (!arrival.empty () && arrival.begin ()->arrival + std::chrono::seconds (60) < now) + { + arrival.erase (arrival.begin ()); + } +} + +bool rai::block_arrival::recent (rai::block_hash const & hash_a) +{ + std::lock_guard lock (mutex); + return arrival.get <1> ().find (hash_a) != arrival.get <1> ().end (); +} + std::unordered_set rai::peer_container::random_set (size_t count_a) { std::unordered_set result; diff --git a/rai/node/node.hpp b/rai/node/node.hpp index c4bf078a..ee9824a2 100644 --- a/rai/node/node.hpp +++ b/rai/node/node.hpp @@ -267,6 +267,30 @@ public: std::atomic confirm_req; std::atomic confirm_ack; }; +class block_arrival_info +{ +public: + std::chrono::system_clock::time_point arrival; + rai::block_hash hash; +}; +// This class tracks blocks that are probably live because they arrived in a UDP packet +// This gives a fairly reliable way to differentiate between blocks being inserted via bootstrap or new, live blocks. +class block_arrival +{ +public: + void add (rai::block_hash const &); + bool recent (rai::block_hash const &); + boost::multi_index_container + < + rai::block_arrival_info, + boost::multi_index::indexed_by + < + boost::multi_index::ordered_non_unique >, + boost::multi_index::hashed_unique > + > + > arrival; + std::mutex mutex; +}; class network { public: @@ -487,6 +511,7 @@ public: rai::rep_crawler rep_crawler; unsigned warmed_up; rai::block_processor block_processor; + rai::block_arrival block_arrival; static double constexpr price_max = 16.0; static double constexpr free_cutoff = 1024.0; static std::chrono::seconds constexpr period = std::chrono::seconds (60);