Store io_context
as shared pointer (#4506)
This commit is contained in:
parent
5e7e7fc02e
commit
f57bcdea9d
20 changed files with 148 additions and 112 deletions
|
@ -21,14 +21,14 @@ using namespace std::chrono_literals;
|
|||
TEST (network, tcp_connection)
|
||||
{
|
||||
nano::test::system system;
|
||||
boost::asio::ip::tcp::acceptor acceptor (system.io_ctx);
|
||||
boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx);
|
||||
auto port = system.get_available_port ();
|
||||
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v4::any (), port);
|
||||
acceptor.open (endpoint.protocol ());
|
||||
acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true));
|
||||
acceptor.bind (endpoint);
|
||||
acceptor.listen ();
|
||||
boost::asio::ip::tcp::socket incoming (system.io_ctx);
|
||||
boost::asio::ip::tcp::socket incoming (*system.io_ctx);
|
||||
std::atomic<bool> done1 (false);
|
||||
std::string message1;
|
||||
acceptor.async_accept (incoming, [&done1, &message1] (boost::system::error_code const & ec_a) {
|
||||
|
@ -39,7 +39,7 @@ TEST (network, tcp_connection)
|
|||
}
|
||||
done1 = true;
|
||||
});
|
||||
boost::asio::ip::tcp::socket connector (system.io_ctx);
|
||||
boost::asio::ip::tcp::socket connector (*system.io_ctx);
|
||||
std::atomic<bool> done2 (false);
|
||||
std::string message2;
|
||||
connector.async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), acceptor.local_endpoint ().port ()),
|
||||
|
@ -538,13 +538,13 @@ TEST (network, ipv6_bind_send_ipv4)
|
|||
std::array<uint8_t, 16> bytes1{};
|
||||
std::atomic<bool> finish1{ false };
|
||||
nano::endpoint endpoint3;
|
||||
boost::asio::ip::udp::socket socket1 (system.io_ctx, endpoint1);
|
||||
boost::asio::ip::udp::socket socket1 (*system.io_ctx, endpoint1);
|
||||
socket1.async_receive_from (boost::asio::buffer (bytes1.data (), bytes1.size ()), endpoint3, [&finish1] (boost::system::error_code const & error, size_t size_a) {
|
||||
ASSERT_FALSE (error);
|
||||
ASSERT_EQ (16, size_a);
|
||||
finish1 = true;
|
||||
});
|
||||
boost::asio::ip::udp::socket socket2 (system.io_ctx, endpoint2);
|
||||
boost::asio::ip::udp::socket socket2 (*system.io_ctx, endpoint2);
|
||||
nano::endpoint endpoint5 (boost::asio::ip::address_v4::loopback (), socket1.local_endpoint ().port ());
|
||||
nano::endpoint endpoint6 (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4::loopback ()), socket2.local_endpoint ().port ());
|
||||
socket2.async_send_to (boost::asio::buffer (std::array<uint8_t, 16>{}, 16), endpoint5, [] (boost::system::error_code const & error, size_t size_a) {
|
||||
|
|
|
@ -36,7 +36,7 @@ TEST (node, stop)
|
|||
nano::test::system system (1);
|
||||
ASSERT_NE (system.nodes[0]->wallets.items.end (), system.nodes[0]->wallets.items.begin ());
|
||||
system.nodes[0]->stop ();
|
||||
system.io_ctx.run ();
|
||||
system.io_ctx->run ();
|
||||
ASSERT_TRUE (true);
|
||||
}
|
||||
|
||||
|
@ -68,10 +68,10 @@ TEST (node, work_generate)
|
|||
TEST (node, block_store_path_failure)
|
||||
{
|
||||
nano::test::system system;
|
||||
auto service (std::make_shared<boost::asio::io_context> ());
|
||||
auto io_ctx = std::make_shared<boost::asio::io_context> ();
|
||||
auto path (nano::unique_path ());
|
||||
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
|
||||
auto node (std::make_shared<nano::node> (*service, system.get_available_port (), path, pool));
|
||||
auto node (std::make_shared<nano::node> (io_ctx, system.get_available_port (), path, pool));
|
||||
ASSERT_TRUE (node->wallets.items.empty ());
|
||||
node->stop ();
|
||||
}
|
||||
|
@ -97,7 +97,7 @@ TEST (node_DeathTest, readonly_block_store_not_exist)
|
|||
TEST (node, password_fanout)
|
||||
{
|
||||
nano::test::system system;
|
||||
boost::asio::io_context io_ctx;
|
||||
auto io_ctx = std::make_shared<boost::asio::io_context> ();
|
||||
auto path (nano::unique_path ());
|
||||
nano::node_config config;
|
||||
config.peering_port = system.get_available_port ();
|
||||
|
|
|
@ -406,7 +406,7 @@ TEST (socket, drop_policy)
|
|||
nano::inactive_node inactivenode (nano::unique_path (), node_flags);
|
||||
auto node = inactivenode.node;
|
||||
|
||||
nano::thread_runner runner (node->io_ctx, 1);
|
||||
nano::thread_runner runner (node->io_ctx_shared, 1);
|
||||
|
||||
std::vector<std::shared_ptr<nano::transport::socket>> connections;
|
||||
|
||||
|
@ -469,7 +469,7 @@ TEST (socket, concurrent_writes)
|
|||
|
||||
// This gives more realistic execution than using system#poll, allowing writes to
|
||||
// queue up and drain concurrently.
|
||||
nano::thread_runner runner (node->io_ctx, 1);
|
||||
nano::thread_runner runner (node->io_ctx_shared, 1);
|
||||
|
||||
constexpr size_t max_connections = 4;
|
||||
constexpr size_t client_count = max_connections;
|
||||
|
@ -622,13 +622,13 @@ TEST (socket_timeout, read)
|
|||
|
||||
// create a server socket
|
||||
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ());
|
||||
boost::asio::ip::tcp::acceptor acceptor (system.io_ctx);
|
||||
boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx);
|
||||
acceptor.open (endpoint.protocol ());
|
||||
acceptor.bind (endpoint);
|
||||
acceptor.listen (boost::asio::socket_base::max_listen_connections);
|
||||
|
||||
// asynchronously accept an incoming connection and create a newsock and do not send any data
|
||||
boost::asio::ip::tcp::socket newsock (system.io_ctx);
|
||||
boost::asio::ip::tcp::socket newsock (*system.io_ctx);
|
||||
acceptor.async_accept (newsock, [] (boost::system::error_code const & ec_a) {
|
||||
EXPECT_FALSE (ec_a);
|
||||
});
|
||||
|
@ -668,13 +668,13 @@ TEST (socket_timeout, write)
|
|||
|
||||
// create a server socket
|
||||
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ());
|
||||
boost::asio::ip::tcp::acceptor acceptor (system.io_ctx);
|
||||
boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx);
|
||||
acceptor.open (endpoint.protocol ());
|
||||
acceptor.bind (endpoint);
|
||||
acceptor.listen (boost::asio::socket_base::max_listen_connections);
|
||||
|
||||
// asynchronously accept an incoming connection and create a newsock and do not receive any data
|
||||
boost::asio::ip::tcp::socket newsock (system.io_ctx);
|
||||
boost::asio::ip::tcp::socket newsock (*system.io_ctx);
|
||||
acceptor.async_accept (newsock, [] (boost::system::error_code const & ec_a) {
|
||||
EXPECT_FALSE (ec_a);
|
||||
});
|
||||
|
@ -719,13 +719,13 @@ TEST (socket_timeout, read_overlapped)
|
|||
|
||||
// create a server socket
|
||||
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ());
|
||||
boost::asio::ip::tcp::acceptor acceptor (system.io_ctx);
|
||||
boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx);
|
||||
acceptor.open (endpoint.protocol ());
|
||||
acceptor.bind (endpoint);
|
||||
acceptor.listen (boost::asio::socket_base::max_listen_connections);
|
||||
|
||||
// asynchronously accept an incoming connection and send one byte only
|
||||
boost::asio::ip::tcp::socket newsock (system.io_ctx);
|
||||
boost::asio::ip::tcp::socket newsock (*system.io_ctx);
|
||||
acceptor.async_accept (newsock, [&newsock] (boost::system::error_code const & ec_a) {
|
||||
EXPECT_FALSE (ec_a);
|
||||
|
||||
|
@ -777,13 +777,13 @@ TEST (socket_timeout, write_overlapped)
|
|||
|
||||
// create a server socket
|
||||
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ());
|
||||
boost::asio::ip::tcp::acceptor acceptor (system.io_ctx);
|
||||
boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx);
|
||||
acceptor.open (endpoint.protocol ());
|
||||
acceptor.bind (endpoint);
|
||||
acceptor.listen (boost::asio::socket_base::max_listen_connections);
|
||||
|
||||
// asynchronously accept an incoming connection and read 2 bytes only
|
||||
boost::asio::ip::tcp::socket newsock (system.io_ctx);
|
||||
boost::asio::ip::tcp::socket newsock (*system.io_ctx);
|
||||
auto buffer = std::make_shared<std::vector<uint8_t>> (1);
|
||||
acceptor.async_accept (newsock, [&newsock, &buffer] (boost::system::error_code const & ec_a) {
|
||||
EXPECT_FALSE (ec_a);
|
||||
|
|
|
@ -10,17 +10,20 @@
|
|||
* thread_runner
|
||||
*/
|
||||
|
||||
nano::thread_runner::thread_runner (boost::asio::io_context & io_ctx_a, unsigned num_threads, const nano::thread_role::name thread_role_a) :
|
||||
io_guard{ boost::asio::make_work_guard (io_ctx_a) },
|
||||
nano::thread_runner::thread_runner (std::shared_ptr<boost::asio::io_context> io_ctx_a, unsigned num_threads, const nano::thread_role::name thread_role_a) :
|
||||
io_ctx{ io_ctx_a },
|
||||
io_guard{ boost::asio::make_work_guard (*io_ctx_a) },
|
||||
role{ thread_role_a }
|
||||
{
|
||||
debug_assert (io_ctx != nullptr);
|
||||
|
||||
for (auto i (0u); i < num_threads; ++i)
|
||||
{
|
||||
threads.emplace_back (nano::thread_attributes::get_default (), [this, &io_ctx_a] () {
|
||||
threads.emplace_back (nano::thread_attributes::get_default (), [this] () {
|
||||
nano::thread_role::set (role);
|
||||
try
|
||||
{
|
||||
run (io_ctx_a);
|
||||
run (*io_ctx);
|
||||
}
|
||||
catch (std::exception const & ex)
|
||||
{
|
||||
|
@ -78,6 +81,7 @@ void nano::thread_runner::join ()
|
|||
i.join ();
|
||||
}
|
||||
}
|
||||
io_ctx.reset ();
|
||||
}
|
||||
|
||||
void nano::thread_runner::stop_event_processing ()
|
||||
|
|
|
@ -12,18 +12,20 @@ namespace nano
|
|||
class thread_runner final
|
||||
{
|
||||
public:
|
||||
thread_runner (boost::asio::io_context &, unsigned num_threads, nano::thread_role::name thread_role = nano::thread_role::name::io);
|
||||
thread_runner (std::shared_ptr<boost::asio::io_context>, unsigned num_threads, nano::thread_role::name thread_role = nano::thread_role::name::io);
|
||||
~thread_runner ();
|
||||
|
||||
/** Tells the IO context to stop processing events.*/
|
||||
void stop_event_processing ();
|
||||
|
||||
/** Wait for IO threads to complete */
|
||||
void join ();
|
||||
|
||||
private:
|
||||
std::shared_ptr<boost::asio::io_context> io_ctx;
|
||||
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> io_guard;
|
||||
nano::thread_role::name const role;
|
||||
std::vector<boost::thread> threads;
|
||||
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> io_guard;
|
||||
|
||||
private:
|
||||
void run (boost::asio::io_context &);
|
||||
|
|
|
@ -592,7 +592,8 @@ int main (int argc, char * const * argv)
|
|||
std::this_thread::sleep_for (std::chrono::seconds (7));
|
||||
std::cout << "Connecting nodes..." << std::endl;
|
||||
|
||||
boost::asio::io_context ioc;
|
||||
std::shared_ptr<boost::asio::io_context> ioc_shared = std::make_shared<boost::asio::io_context> ();
|
||||
boost::asio::io_context & ioc{ *ioc_shared };
|
||||
|
||||
debug_assert (!nano::signal_handler_impl);
|
||||
nano::signal_handler_impl = [&ioc] () {
|
||||
|
@ -715,7 +716,8 @@ int main (int argc, char * const * argv)
|
|||
// Stop main node
|
||||
stop_rpc (ioc, primary_node_results);
|
||||
});
|
||||
nano::thread_runner runner (ioc, simultaneous_process_calls);
|
||||
|
||||
nano::thread_runner runner (ioc_shared, simultaneous_process_calls);
|
||||
t.join ();
|
||||
runner.join ();
|
||||
|
||||
|
|
|
@ -98,7 +98,8 @@ void nano::daemon::run (std::filesystem::path const & data_path, nano::node_flag
|
|||
config.node.websocket_config.tls_config = tls_config;
|
||||
}
|
||||
|
||||
boost::asio::io_context io_ctx;
|
||||
std::shared_ptr<boost::asio::io_context> io_ctx = std::make_shared<boost::asio::io_context> ();
|
||||
|
||||
auto opencl = nano::opencl_work::create (config.opencl_enable, config.opencl, logger, config.node.network_params.work);
|
||||
nano::opencl_work_func_t opencl_work_func;
|
||||
if (opencl)
|
||||
|
@ -132,7 +133,7 @@ void nano::daemon::run (std::filesystem::path const & data_path, nano::node_flag
|
|||
config.node.peering_port = network_params.network.default_node_port;
|
||||
}
|
||||
|
||||
auto node (std::make_shared<nano::node> (io_ctx, data_path, config.node, opencl_work, flags));
|
||||
auto node = std::make_shared<nano::node> (io_ctx, data_path, config.node, opencl_work, flags);
|
||||
if (!node->init_error ())
|
||||
{
|
||||
auto network_label = node->network_params.network.get_current_network_as_string ();
|
||||
|
@ -165,10 +166,14 @@ void nano::daemon::run (std::filesystem::path const & data_path, nano::node_flag
|
|||
}
|
||||
|
||||
rpc_config.tls_config = tls_config;
|
||||
rpc_handler = std::make_unique<nano::inprocess_rpc_handler> (*node, ipc_server, config.rpc, [&ipc_server, &workers = node->workers, &io_ctx] () {
|
||||
rpc_handler = std::make_unique<nano::inprocess_rpc_handler> (*node, ipc_server, config.rpc,
|
||||
[&ipc_server, &workers = node->workers, io_ctx_w = std::weak_ptr{ io_ctx }] () {
|
||||
ipc_server.stop ();
|
||||
workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (3), [&io_ctx] () {
|
||||
io_ctx.stop ();
|
||||
workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (3), [io_ctx_w] () {
|
||||
if (auto io_ctx_l = io_ctx_w.lock ())
|
||||
{
|
||||
io_ctx_l->stop ();
|
||||
}
|
||||
});
|
||||
});
|
||||
rpc = nano::get_rpc (io_ctx, rpc_config, *rpc_handler);
|
||||
|
@ -189,10 +194,13 @@ void nano::daemon::run (std::filesystem::path const & data_path, nano::node_flag
|
|||
}
|
||||
|
||||
debug_assert (!nano::signal_handler_impl);
|
||||
nano::signal_handler_impl = [this, &io_ctx] () {
|
||||
nano::signal_handler_impl = [this, io_ctx_w = std::weak_ptr{ io_ctx }] () {
|
||||
logger.warn (nano::log::type::daemon, "Interrupt signal received, stopping...");
|
||||
|
||||
io_ctx.stop ();
|
||||
if (auto io_ctx_l = io_ctx_w.lock ())
|
||||
{
|
||||
io_ctx_l->stop ();
|
||||
}
|
||||
sig_int_or_term = 1;
|
||||
};
|
||||
|
||||
|
|
|
@ -1129,8 +1129,8 @@ int main (int argc, char * const * argv)
|
|||
}
|
||||
}
|
||||
std::cout << boost::str (boost::format ("Starting generating %1% blocks...\n") % (count * 2));
|
||||
boost::asio::io_context io_ctx1;
|
||||
boost::asio::io_context io_ctx2;
|
||||
auto io_ctx1 = std::make_shared<boost::asio::io_context> ();
|
||||
auto io_ctx2 = std::make_shared<boost::asio::io_context> ();
|
||||
nano::work_pool work{ network_params.network, std::numeric_limits<unsigned>::max () };
|
||||
auto path1 (nano::unique_path ());
|
||||
auto path2 (nano::unique_path ());
|
||||
|
@ -1283,8 +1283,8 @@ int main (int argc, char * const * argv)
|
|||
auto end (std::chrono::high_resolution_clock::now ());
|
||||
auto time (std::chrono::duration_cast<std::chrono::microseconds> (end - begin).count ());
|
||||
std::cout << boost::str (boost::format ("%|1$ 12d| us \n%2% frontiers per second\n") % time % ((count + 1) * 1000000 / time));
|
||||
io_ctx1.stop ();
|
||||
io_ctx2.stop ();
|
||||
io_ctx1->stop ();
|
||||
io_ctx2->stop ();
|
||||
runner1.join ();
|
||||
runner2.join ();
|
||||
node1->stop ();
|
||||
|
|
|
@ -49,17 +49,23 @@ void run (std::filesystem::path const & data_path, std::vector<std::string> cons
|
|||
rpc_config.tls_config = tls_config;
|
||||
}
|
||||
|
||||
boost::asio::io_context io_ctx;
|
||||
std::shared_ptr<boost::asio::io_context> io_ctx = std::make_shared<boost::asio::io_context> ();
|
||||
|
||||
nano::signal_manager sigman;
|
||||
try
|
||||
{
|
||||
nano::ipc_rpc_processor ipc_rpc_processor (io_ctx, rpc_config);
|
||||
nano::ipc_rpc_processor ipc_rpc_processor (*io_ctx, rpc_config);
|
||||
auto rpc = nano::get_rpc (io_ctx, rpc_config, ipc_rpc_processor);
|
||||
rpc->start ();
|
||||
|
||||
debug_assert (!nano::signal_handler_impl);
|
||||
nano::signal_handler_impl = [&io_ctx] () {
|
||||
io_ctx.stop ();
|
||||
nano::signal_handler_impl = [io_ctx_w = std::weak_ptr{ io_ctx }] () {
|
||||
logger.warn (nano::log::type::daemon, "Interrupt signal received, stopping...");
|
||||
|
||||
if (auto io_ctx_l = io_ctx_w.lock ())
|
||||
{
|
||||
io_ctx_l->stop ();
|
||||
}
|
||||
sig_int_or_term = 1;
|
||||
};
|
||||
|
||||
|
|
|
@ -122,7 +122,8 @@ int run_wallet (QApplication & application, int argc, char * const * argv, std::
|
|||
config.node.websocket_config.tls_config = tls_config;
|
||||
}
|
||||
|
||||
boost::asio::io_context io_ctx;
|
||||
std::shared_ptr<boost::asio::io_context> io_ctx = std::make_shared<boost::asio::io_context> ();
|
||||
|
||||
nano::thread_runner runner (io_ctx, config.node.io_threads);
|
||||
|
||||
std::shared_ptr<nano::node> node;
|
||||
|
|
|
@ -463,12 +463,13 @@ class socket_transport : public nano::ipc::transport
|
|||
{
|
||||
public:
|
||||
socket_transport (nano::ipc::ipc_server & server_a, ENDPOINT_TYPE endpoint_a, nano::ipc::ipc_config_transport & config_transport_a, int concurrency_a) :
|
||||
server (server_a), config_transport (config_transport_a)
|
||||
server (server_a),
|
||||
config_transport (config_transport_a)
|
||||
{
|
||||
// Using a per-transport event dispatcher?
|
||||
if (concurrency_a > 0)
|
||||
{
|
||||
io_ctx = std::make_unique<boost::asio::io_context> ();
|
||||
io_ctx = std::make_shared<boost::asio::io_context> ();
|
||||
}
|
||||
|
||||
boost::asio::socket_base::reuse_address option (true);
|
||||
|
@ -482,7 +483,7 @@ public:
|
|||
// A separate io_context for domain sockets may facilitate better performance on some systems.
|
||||
if (concurrency_a > 0)
|
||||
{
|
||||
runner = std::make_unique<nano::thread_runner> (*io_ctx, static_cast<unsigned> (concurrency_a));
|
||||
runner = std::make_unique<nano::thread_runner> (io_ctx, static_cast<unsigned> (concurrency_a));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -544,7 +545,7 @@ private:
|
|||
nano::ipc::ipc_server & server;
|
||||
nano::ipc::ipc_config_transport & config_transport;
|
||||
std::unique_ptr<nano::thread_runner> runner;
|
||||
std::unique_ptr<boost::asio::io_context> io_ctx;
|
||||
std::shared_ptr<boost::asio::io_context> io_ctx;
|
||||
std::unique_ptr<ACCEPTOR_TYPE> acceptor;
|
||||
};
|
||||
|
||||
|
|
|
@ -120,15 +120,16 @@ nano::keypair nano::load_or_create_node_id (std::filesystem::path const & applic
|
|||
}
|
||||
}
|
||||
|
||||
nano::node::node (boost::asio::io_context & io_ctx_a, uint16_t peering_port_a, std::filesystem::path const & application_path_a, nano::work_pool & work_a, nano::node_flags flags_a, unsigned seq) :
|
||||
nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, uint16_t peering_port_a, std::filesystem::path const & application_path_a, nano::work_pool & work_a, nano::node_flags flags_a, unsigned seq) :
|
||||
node (io_ctx_a, application_path_a, nano::node_config (peering_port_a), work_a, flags_a, seq)
|
||||
{
|
||||
}
|
||||
|
||||
nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path const & application_path_a, nano::node_config const & config_a, nano::work_pool & work_a, nano::node_flags flags_a, unsigned seq) :
|
||||
nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesystem::path const & application_path_a, nano::node_config const & config_a, nano::work_pool & work_a, nano::node_flags flags_a, unsigned seq) :
|
||||
io_ctx_shared{ io_ctx_a },
|
||||
io_ctx{ *io_ctx_shared },
|
||||
node_id{ load_or_create_node_id (application_path_a) },
|
||||
write_database_queue (!flags_a.force_use_write_database_queue && (config_a.rocksdb_config.enable)),
|
||||
io_ctx (io_ctx_a),
|
||||
node_initialized_latch (1),
|
||||
config (config_a),
|
||||
network_params{ config.network_params },
|
||||
|
@ -1380,7 +1381,7 @@ nano::node_wrapper::node_wrapper (std::filesystem::path const & path_a, std::fil
|
|||
auto & node_config = daemon_config.node;
|
||||
node_config.peering_port = 24000;
|
||||
|
||||
node = std::make_shared<nano::node> (*io_context, path_a, node_config, work, node_flags_a);
|
||||
node = std::make_shared<nano::node> (io_context, path_a, node_config, work, node_flags_a);
|
||||
}
|
||||
|
||||
nano::node_wrapper::~node_wrapper ()
|
||||
|
|
|
@ -63,11 +63,11 @@ namespace scheduler
|
|||
backlog_population::config backlog_population_config (node_config const &);
|
||||
outbound_bandwidth_limiter::config outbound_bandwidth_limiter_config (node_config const &);
|
||||
|
||||
class node final : public std::enable_shared_from_this<nano::node>
|
||||
class node final : public std::enable_shared_from_this<node>
|
||||
{
|
||||
public:
|
||||
node (boost::asio::io_context &, uint16_t, std::filesystem::path const &, nano::work_pool &, nano::node_flags = nano::node_flags (), unsigned seq = 0);
|
||||
node (boost::asio::io_context &, std::filesystem::path const &, nano::node_config const &, nano::work_pool &, nano::node_flags = nano::node_flags (), unsigned seq = 0);
|
||||
node (std::shared_ptr<boost::asio::io_context>, uint16_t peering_port, std::filesystem::path const & application_path, nano::work_pool &, nano::node_flags = nano::node_flags (), unsigned seq = 0);
|
||||
node (std::shared_ptr<boost::asio::io_context>, std::filesystem::path const & application_path, nano::node_config const &, nano::work_pool &, nano::node_flags = nano::node_flags (), unsigned seq = 0);
|
||||
~node ();
|
||||
|
||||
public:
|
||||
|
@ -133,6 +133,7 @@ public:
|
|||
public:
|
||||
const nano::keypair node_id;
|
||||
nano::write_database_queue write_database_queue;
|
||||
std::shared_ptr<boost::asio::io_context> io_ctx_shared;
|
||||
boost::asio::io_context & io_ctx;
|
||||
boost::latch node_initialized_latch;
|
||||
nano::node_config config;
|
||||
|
|
|
@ -12,10 +12,11 @@
|
|||
#include <nano/rpc/rpc_secure.hpp>
|
||||
#endif
|
||||
|
||||
nano::rpc::rpc (boost::asio::io_context & io_ctx_a, nano::rpc_config config_a, nano::rpc_handler_interface & rpc_handler_interface_a) :
|
||||
nano::rpc::rpc (std::shared_ptr<boost::asio::io_context> io_ctx_a, nano::rpc_config config_a, nano::rpc_handler_interface & rpc_handler_interface_a) :
|
||||
config (std::move (config_a)),
|
||||
acceptor (io_ctx_a),
|
||||
io_ctx (io_ctx_a),
|
||||
io_ctx_shared (io_ctx_a),
|
||||
io_ctx (*io_ctx_shared),
|
||||
acceptor (io_ctx),
|
||||
rpc_handler_interface (rpc_handler_interface_a)
|
||||
{
|
||||
rpc_handler_interface.rpc_instance (*this);
|
||||
|
@ -78,7 +79,7 @@ void nano::rpc::stop ()
|
|||
acceptor.close ();
|
||||
}
|
||||
|
||||
std::unique_ptr<nano::rpc> nano::get_rpc (boost::asio::io_context & io_ctx_a, nano::rpc_config const & config_a, nano::rpc_handler_interface & rpc_handler_interface_a)
|
||||
std::unique_ptr<nano::rpc> nano::get_rpc (std::shared_ptr<boost::asio::io_context> io_ctx_a, nano::rpc_config const & config_a, nano::rpc_handler_interface & rpc_handler_interface_a)
|
||||
{
|
||||
std::unique_ptr<rpc> impl;
|
||||
|
||||
|
|
|
@ -20,25 +20,29 @@ class rpc_handler_interface;
|
|||
class rpc
|
||||
{
|
||||
public:
|
||||
rpc (boost::asio::io_context & io_ctx_a, nano::rpc_config config_a, nano::rpc_handler_interface & rpc_handler_interface_a);
|
||||
rpc (std::shared_ptr<boost::asio::io_context>, nano::rpc_config config_a, nano::rpc_handler_interface & rpc_handler_interface_a);
|
||||
virtual ~rpc ();
|
||||
|
||||
void start ();
|
||||
virtual void accept ();
|
||||
void stop ();
|
||||
|
||||
std::uint16_t listening_port ()
|
||||
virtual void accept ();
|
||||
|
||||
std::uint16_t listening_port () const
|
||||
{
|
||||
return acceptor.local_endpoint ().port ();
|
||||
}
|
||||
|
||||
public:
|
||||
nano::logger logger{ "rpc" };
|
||||
nano::rpc_config config;
|
||||
boost::asio::ip::tcp::acceptor acceptor;
|
||||
std::shared_ptr<boost::asio::io_context> io_ctx_shared;
|
||||
boost::asio::io_context & io_ctx;
|
||||
boost::asio::ip::tcp::acceptor acceptor;
|
||||
nano::rpc_handler_interface & rpc_handler_interface;
|
||||
bool stopped{ false };
|
||||
};
|
||||
|
||||
/** Returns the correct RPC implementation based on TLS configuration */
|
||||
std::unique_ptr<nano::rpc> get_rpc (boost::asio::io_context & io_ctx_a, nano::rpc_config const & config_a, nano::rpc_handler_interface & rpc_handler_interface_a);
|
||||
std::unique_ptr<nano::rpc> get_rpc (std::shared_ptr<boost::asio::io_context>, nano::rpc_config const & config_a, nano::rpc_handler_interface & rpc_handler_interface_a);
|
||||
}
|
||||
|
|
|
@ -1741,7 +1741,7 @@ TEST (rpc, version)
|
|||
auto const rpc_ctx = add_rpc (system, node1);
|
||||
boost::property_tree::ptree request1;
|
||||
request1.put ("action", "version");
|
||||
test_response response1 (request1, rpc_ctx.rpc->listening_port (), system.io_ctx);
|
||||
test_response response1 (request1, rpc_ctx.rpc->listening_port (), *system.io_ctx);
|
||||
ASSERT_TIMELY (5s, response1.status != 0);
|
||||
ASSERT_EQ (200, response1.status);
|
||||
ASSERT_EQ ("1", response1.json.get<std::string> ("rpc_version"));
|
||||
|
@ -2506,7 +2506,7 @@ TEST (rpc, bootstrap)
|
|||
request.put ("action", "bootstrap");
|
||||
request.put ("address", "::ffff:127.0.0.1");
|
||||
request.put ("port", node1->network.endpoint ().port ());
|
||||
test_response response (request, rpc_ctx.rpc->listening_port (), system0.io_ctx);
|
||||
test_response response (request, rpc_ctx.rpc->listening_port (), *system0.io_ctx);
|
||||
while (response.status == 0)
|
||||
{
|
||||
system0.poll ();
|
||||
|
@ -6046,7 +6046,7 @@ TEST (rpc, simultaneous_calls)
|
|||
const auto ipc_tcp_port = ipc_server.listening_tcp_port ();
|
||||
ASSERT_TRUE (ipc_tcp_port.has_value ());
|
||||
rpc_config.rpc_process.num_ipc_connections = 8;
|
||||
nano::ipc_rpc_processor ipc_rpc_processor (system.io_ctx, rpc_config, ipc_tcp_port.value ());
|
||||
nano::ipc_rpc_processor ipc_rpc_processor (*system.io_ctx, rpc_config, ipc_tcp_port.value ());
|
||||
nano::rpc rpc (system.io_ctx, rpc_config, ipc_rpc_processor);
|
||||
rpc.start ();
|
||||
boost::property_tree::ptree request;
|
||||
|
@ -6057,7 +6057,7 @@ TEST (rpc, simultaneous_calls)
|
|||
std::array<std::unique_ptr<test_response>, num> test_responses;
|
||||
for (int i = 0; i < num; ++i)
|
||||
{
|
||||
test_responses[i] = std::make_unique<test_response> (request, system.io_ctx);
|
||||
test_responses[i] = std::make_unique<test_response> (request, *system.io_ctx);
|
||||
}
|
||||
|
||||
std::promise<void> promise;
|
||||
|
@ -6087,7 +6087,7 @@ TEST (rpc, simultaneous_calls)
|
|||
rpc.stop ();
|
||||
system.stop ();
|
||||
ipc_server.stop ();
|
||||
system.io_ctx.stop ();
|
||||
system.io_ctx->stop ();
|
||||
runner.join ();
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ nano::test::rpc_context::rpc_context (std::shared_ptr<nano::rpc> & rpc_a, std::u
|
|||
|
||||
void nano::test::wait_response_impl (nano::test::system & system, rpc_context const & rpc_ctx, boost::property_tree::ptree & request, std::chrono::duration<double, std::nano> const & time, boost::property_tree::ptree & response_json)
|
||||
{
|
||||
test_response response (request, rpc_ctx.rpc->listening_port (), system.io_ctx);
|
||||
test_response response (request, rpc_ctx.rpc->listening_port (), *system.io_ctx);
|
||||
ASSERT_TIMELY (time, response.status != 0);
|
||||
ASSERT_EQ (200, response.status);
|
||||
response_json = response.json;
|
||||
|
@ -49,7 +49,7 @@ nano::test::rpc_context nano::test::add_rpc (nano::test::system & system, std::s
|
|||
nano::rpc_config rpc_config (node_a->network_params.network, system.get_available_port (), true);
|
||||
const auto ipc_tcp_port = ipc_server->listening_tcp_port ();
|
||||
debug_assert (ipc_tcp_port.has_value ());
|
||||
auto ipc_rpc_processor (std::make_unique<nano::ipc_rpc_processor> (system.io_ctx, rpc_config, ipc_tcp_port.value ()));
|
||||
auto ipc_rpc_processor (std::make_unique<nano::ipc_rpc_processor> (*system.io_ctx, rpc_config, ipc_tcp_port.value ()));
|
||||
auto rpc (std::make_shared<nano::rpc> (system.io_ctx, rpc_config, *ipc_rpc_processor));
|
||||
rpc->start ();
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ public:
|
|||
node_rpc_config{},
|
||||
rpc_config{ node.network_params.network, port, true },
|
||||
ipc{ node, node_rpc_config },
|
||||
ipc_rpc_processor{ system.io_ctx, rpc_config },
|
||||
ipc_rpc_processor{ *system.io_ctx, rpc_config },
|
||||
rpc{ system.io_ctx, rpc_config, ipc_rpc_processor }
|
||||
{
|
||||
}
|
||||
|
|
|
@ -25,6 +25,49 @@ std::string nano::error_system_messages::message (int ev) const
|
|||
return "Invalid error code";
|
||||
}
|
||||
|
||||
/*
|
||||
* system
|
||||
*/
|
||||
|
||||
nano::test::system::system () :
|
||||
io_ctx{ std::make_shared<boost::asio::io_context> () }
|
||||
{
|
||||
auto scale_str = std::getenv ("DEADLINE_SCALE_FACTOR");
|
||||
if (scale_str)
|
||||
{
|
||||
deadline_scaling_factor = std::stod (scale_str);
|
||||
}
|
||||
}
|
||||
|
||||
nano::test::system::system (uint16_t count_a, nano::transport::transport_type type_a, nano::node_flags flags_a) :
|
||||
system ()
|
||||
{
|
||||
nodes.reserve (count_a);
|
||||
for (uint16_t i (0); i < count_a; ++i)
|
||||
{
|
||||
add_node (default_config (), flags_a, type_a);
|
||||
}
|
||||
}
|
||||
|
||||
nano::test::system::~system ()
|
||||
{
|
||||
// Only stop system in destructor to avoid confusing and random bugs when debugging assertions that hit deadline expired condition
|
||||
stop ();
|
||||
|
||||
#ifndef _WIN32
|
||||
// Windows cannot remove the log and data files while they are still owned by this process.
|
||||
// They will be removed later
|
||||
|
||||
// Clean up tmp directories created by the tests. Since it's sometimes useful to
|
||||
// see log files after test failures, an environment variable is supported to
|
||||
// retain the files.
|
||||
if (std::getenv ("TEST_KEEP_TMPDIRS") == nullptr)
|
||||
{
|
||||
nano::remove_temporary_directories ();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
nano::node & nano::test::system::node (std::size_t index) const
|
||||
{
|
||||
debug_assert (index < nodes.size ());
|
||||
|
@ -142,44 +185,6 @@ std::shared_ptr<nano::node> nano::test::system::make_disconnected_node (std::opt
|
|||
return node;
|
||||
}
|
||||
|
||||
nano::test::system::system ()
|
||||
{
|
||||
auto scale_str = std::getenv ("DEADLINE_SCALE_FACTOR");
|
||||
if (scale_str)
|
||||
{
|
||||
deadline_scaling_factor = std::stod (scale_str);
|
||||
}
|
||||
}
|
||||
|
||||
nano::test::system::system (uint16_t count_a, nano::transport::transport_type type_a, nano::node_flags flags_a) :
|
||||
system ()
|
||||
{
|
||||
nodes.reserve (count_a);
|
||||
for (uint16_t i (0); i < count_a; ++i)
|
||||
{
|
||||
add_node (default_config (), flags_a, type_a);
|
||||
}
|
||||
}
|
||||
|
||||
nano::test::system::~system ()
|
||||
{
|
||||
// Only stop system in destructor to avoid confusing and random bugs when debugging assertions that hit deadline expired condition
|
||||
stop ();
|
||||
|
||||
#ifndef _WIN32
|
||||
// Windows cannot remove the log and data files while they are still owned by this process.
|
||||
// They will be removed later
|
||||
|
||||
// Clean up tmp directories created by the tests. Since it's sometimes useful to
|
||||
// see log files after test failures, an environment variable is supported to
|
||||
// retain the files.
|
||||
if (std::getenv ("TEST_KEEP_TMPDIRS") == nullptr)
|
||||
{
|
||||
nano::remove_temporary_directories ();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void nano::test::system::ledger_initialization_set (std::vector<nano::keypair> const & reps, nano::amount const & reserve)
|
||||
{
|
||||
nano::block_hash previous = nano::dev::genesis->hash ();
|
||||
|
@ -285,7 +290,7 @@ void nano::test::system::deadline_set (std::chrono::duration<double, std::nano>
|
|||
std::error_code nano::test::system::poll (std::chrono::nanoseconds const & wait_time)
|
||||
{
|
||||
#if NANO_ASIO_HANDLER_TRACKING == 0
|
||||
io_ctx.run_one_for (wait_time);
|
||||
io_ctx->run_one_for (wait_time);
|
||||
#else
|
||||
nano::timer<> timer;
|
||||
timer.start ();
|
||||
|
@ -331,7 +336,7 @@ void nano::test::system::delay_ms (std::chrono::milliseconds const & delay)
|
|||
auto endtime = now + delay;
|
||||
while (now <= endtime)
|
||||
{
|
||||
io_ctx.run_one_for (endtime - now);
|
||||
io_ctx->run_one_for (endtime - now);
|
||||
now = std::chrono::steady_clock::now ();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -74,7 +74,7 @@ namespace test
|
|||
uint16_t get_available_port ();
|
||||
|
||||
public:
|
||||
boost::asio::io_context io_ctx;
|
||||
std::shared_ptr<boost::asio::io_context> io_ctx;
|
||||
std::vector<std::shared_ptr<nano::node>> nodes;
|
||||
nano::stats stats;
|
||||
nano::logger logger{ "tests" };
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue