Revert coroutine changes for core & rpc_test (#3081)

* Revert coroutine changes for core & rpc_test

* Fix merge
This commit is contained in:
Wesley Shillingford 2021-02-02 09:57:56 +00:00 committed by GitHub
commit 8ce76120fe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 65 additions and 90 deletions

View file

@ -347,16 +347,8 @@ set(Boost_USE_MULTITHREADED ON)
list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/Modules")
find_package(
Boost 1.69.0 REQUIRED
COMPONENTS coroutine
context
filesystem
log
log_setup
thread
program_options
system)
find_package(Boost 1.69.0 REQUIRED COMPONENTS filesystem log log_setup thread
program_options system)
# RocksDB
include_directories(rocksdb/include)
@ -549,6 +541,7 @@ if(NANO_FUZZER_TEST)
endif()
if(NANO_TEST OR RAIBLOCKS_TEST)
find_package(Boost 1.69.0 REQUIRED COMPONENTS coroutine context)
if(WIN32)
if(MSVC_VERSION)
if(MSVC_VERSION GREATER_EQUAL 1910)

0
ci/cmake-format-all.sh Normal file → Executable file
View file

View file

@ -7,4 +7,5 @@ target_link_libraries(
test_common
gtest
Boost::boost
Boost::coroutine)
Boost::coroutine
Boost::context)

View file

@ -152,8 +152,6 @@ target_link_libraries(
libminiupnpc-static
argon2
lmdb
Boost::coroutine
Boost::context
Boost::filesystem
Boost::log_setup
Boost::log

View file

@ -22,10 +22,7 @@ publish_filter (256 * 1024),
udp_channels (node_a, port_a),
tcp_channels (node_a),
port (port_a),
disconnect_observer ([]() {}),
cleanup_timer{ node_a.io_ctx },
cookie_timer{ node_a.io_ctx },
keepalive_timer{ node_a.io_ctx }
disconnect_observer ([]() {})
{
boost::thread::attributes attrs;
nano::thread_attributes::set (attrs);
@ -136,9 +133,6 @@ void nano::network::stop ()
{
thread.join ();
}
cleanup_timer.cancel ();
cookie_timer.cancel ();
keepalive_timer.cancel ();
}
}
@ -737,47 +731,38 @@ void nano::network::cleanup (std::chrono::steady_clock::time_point const & cutof
void nano::network::ongoing_cleanup ()
{
node.spawn (
[this](boost::asio::yield_context yield) {
boost::system::error_code ec;
while (!stopped && !ec)
cleanup (std::chrono::steady_clock::now () - node.network_params.node.cutoff);
std::weak_ptr<nano::node> node_w (node.shared ());
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.node.period, [node_w]() {
if (auto node_l = node_w.lock ())
{
cleanup (std::chrono::steady_clock::now () - node.network_params.node.cutoff);
cleanup_timer.expires_from_now (node.network_params.node.period);
cleanup_timer.async_wait (yield[ec]);
node_l->network.ongoing_cleanup ();
}
debug_assert (stopped || ec == boost::asio::error::operation_aborted);
});
}
void nano::network::ongoing_syn_cookie_cleanup ()
{
node.spawn (
[this](boost::asio::yield_context yield) {
boost::system::error_code ec;
while (!stopped && !ec)
syn_cookies.purge (std::chrono::steady_clock::now () - nano::transport::syn_cookie_cutoff);
std::weak_ptr<nano::node> node_w (node.shared ());
node.workers.add_timed_task (std::chrono::steady_clock::now () + (nano::transport::syn_cookie_cutoff * 2), [node_w]() {
if (auto node_l = node_w.lock ())
{
this->syn_cookies.purge (std::chrono::steady_clock::now () - nano::transport::syn_cookie_cutoff);
cookie_timer.expires_from_now (nano::transport::syn_cookie_cutoff * 2);
cookie_timer.async_wait (yield[ec]);
node_l->network.ongoing_syn_cookie_cleanup ();
}
debug_assert (stopped || ec == boost::asio::error::operation_aborted);
});
}
void nano::network::ongoing_keepalive ()
{
node.spawn (
[this](boost::asio::yield_context yield) {
boost::system::error_code ec;
while (!stopped && !ec)
flood_keepalive (0.75f);
flood_keepalive_self (0.25f);
std::weak_ptr<nano::node> node_w (node.shared ());
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.node.half_period, [node_w]() {
if (auto node_l = node_w.lock ())
{
flood_keepalive (0.75f);
flood_keepalive_self (0.25f);
keepalive_timer.expires_from_now (node.network_params.node.half_period);
keepalive_timer.async_wait (yield[ec]);
node_l->network.ongoing_keepalive ();
}
debug_assert (stopped || ec == boost::asio::error::operation_aborted);
});
}

View file

@ -195,9 +195,6 @@ public:
// Called when a new channel is observed
std::function<void(std::shared_ptr<nano::transport::channel>)> channel_observer;
std::atomic<bool> stopped{ false };
boost::asio::steady_timer cleanup_timer;
boost::asio::steady_timer cookie_timer;
boost::asio::steady_timer keepalive_timer;
static unsigned const broadcast_interval_ms = 10;
static size_t const buffer_size = 512;
static size_t const confirm_req_hashes_max = 7;

View file

@ -1,6 +1,5 @@
#pragma once
#include <nano/boost/asio/spawn.hpp>
#include <nano/lib/config.hpp>
#include <nano/lib/stats.hpp>
#include <nano/lib/work.hpp>
@ -94,12 +93,6 @@ public:
{
io_ctx.post (action_a);
}
template <typename... Params>
void spawn (Params... args)
{
boost::coroutines::attributes attributes{ boost::coroutines::stack_allocator::traits_type::default_size () * (is_sanitizer_build ? 2 : 1) };
boost::asio::spawn (io_ctx, std::forward<Params> (args)..., attributes);
}
bool copy_with_compaction (boost::filesystem::path const &);
void keepalive (std::string const &, uint16_t);
void start ();

View file

@ -1,13 +1,6 @@
add_executable(rpc_test entry.cpp rpc.cpp)
target_link_libraries(
rpc_test
node
secure
rpc
test_common
gtest
Boost::coroutine)
target_link_libraries(rpc_test node secure rpc test_common gtest)
target_compile_definitions(
rpc_test

View file

@ -1,4 +1,3 @@
#include <nano/boost/asio/spawn.hpp>
#include <nano/boost/beast/core/flat_buffer.hpp>
#include <nano/boost/beast/http.hpp>
#include <nano/lib/rpcconfig.hpp>
@ -27,26 +26,23 @@ class test_response
{
public:
test_response (boost::property_tree::ptree const & request_a, boost::asio::io_context & io_ctx_a) :
request (request_a)
request (request_a),
sock (io_ctx_a)
{
}
test_response (boost::property_tree::ptree const & request_a, uint16_t port_a, boost::asio::io_context & io_ctx_a) :
request (request_a)
request (request_a),
sock (io_ctx_a)
{
run (port_a, io_ctx_a);
run (port_a);
}
void run (uint16_t port_a, boost::asio::io_context & io_ctx_a)
void run (uint16_t port_a)
{
boost::asio::spawn (io_ctx_a, [this, &io_ctx_a, port_a](boost::asio::yield_context yield) {
boost::asio::ip::tcp::socket sock (io_ctx_a);
boost::beast::flat_buffer sb;
boost::beast::http::request<boost::beast::http::string_body> req;
try
sock.async_connect (nano::tcp_endpoint (boost::asio::ip::address_v6::loopback (), port_a), [this](boost::system::error_code const & ec) {
if (!ec)
{
sock.async_connect (nano::tcp_endpoint (boost::asio::ip::address_v6::loopback (), port_a), yield);
std::stringstream ostream;
boost::property_tree::write_json (ostream, request);
req.method (boost::beast::http::verb::post);
@ -55,27 +51,46 @@ public:
ostream.flush ();
req.body () = ostream.str ();
req.prepare_payload ();
boost::beast::http::async_write (sock, req, yield);
boost::beast::http::async_read (sock, sb, resp, yield);
std::stringstream body (resp.body ());
try
{
boost::property_tree::read_json (body, json);
status = 200;
}
catch (std::exception const &)
{
status = 400;
}
boost::beast::http::async_write (sock, req, [this](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
{
boost::beast::http::async_read (sock, sb, resp, [this](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
{
std::stringstream body (resp.body ());
try
{
boost::property_tree::read_json (body, json);
status = 200;
}
catch (std::exception &)
{
status = 500;
}
}
else
{
status = 400;
}
});
}
else
{
status = 600;
}
});
}
catch (boost::system::error_code const &)
else
{
status = 400;
}
});
}
boost::property_tree::ptree const & request;
boost::asio::ip::tcp::socket sock;
boost::property_tree::ptree json;
boost::beast::flat_buffer sb;
boost::beast::http::request<boost::beast::http::string_body> req;
boost::beast::http::response<boost::beast::http::string_body> resp;
std::atomic<int> status{ 0 };
};
@ -7259,8 +7274,8 @@ TEST (rpc, simultaneous_calls)
std::atomic<int> count{ num };
for (int i = 0; i < num; ++i)
{
std::thread ([&test_responses, &promise, &count, i, port = rpc.config.port, &io_ctx = system.io_ctx]() {
test_responses[i]->run (port, io_ctx);
std::thread ([&test_responses, &promise, &count, i, port = rpc.config.port]() {
test_responses[i]->run (port);
if (--count == 0)
{
promise.set_value ();