From e1ce480ebbb71f07f005543d409efefe33c35200 Mon Sep 17 00:00:00 2001 From: Russel Waters Date: Wed, 10 Oct 2018 19:40:01 -0400 Subject: [PATCH] converting to boost threads and setting 8MB thread stacks (#1289) converting to boost threads and setting 8MB thread stack size universally --- rai/core_test/network.cpp | 8 ++++---- rai/core_test/processor_service.cpp | 8 ++++---- rai/core_test/rpc.cpp | 4 ++-- rai/core_test/wallet.cpp | 2 +- rai/lib/interface.cpp | 2 +- rai/lib/utility.cpp | 6 ++++++ rai/lib/utility.hpp | 6 ++++++ rai/lib/work.cpp | 6 ++++-- rai/lib/work.hpp | 3 ++- rai/node/bootstrap.hpp | 3 ++- rai/node/node.cpp | 14 +++++++++----- rai/node/node.hpp | 13 +++++++------ rai/node/wallet.hpp | 3 ++- rai/slow_test/node.cpp | 10 +++++----- 14 files changed, 55 insertions(+), 33 deletions(-) diff --git a/rai/core_test/network.cpp b/rai/core_test/network.cpp index e49184e3..a9168778 100644 --- a/rai/core_test/network.cpp +++ b/rai/core_test/network.cpp @@ -1013,7 +1013,7 @@ TEST (udp_buffer, one_buffer_multithreaded) { rai::stat stats; rai::udp_buffer buffer (stats, 512, 1); - std::thread thread ([&buffer]() { + boost::thread thread ([&buffer]() { auto done (false); while (!done) { @@ -1038,10 +1038,10 @@ TEST (udp_buffer, many_buffers_multithreaded) { rai::stat stats; rai::udp_buffer buffer (stats, 512, 16); - std::vector threads; + std::vector threads; for (auto i (0); i < 4; ++i) { - threads.push_back (std::thread ([&buffer]() { + threads.push_back (boost::thread ([&buffer]() { auto done (false); while (!done) { @@ -1057,7 +1057,7 @@ TEST (udp_buffer, many_buffers_multithreaded) std::atomic_int count (0); for (auto i (0); i < 4; ++i) { - threads.push_back (std::thread ([&buffer, &count]() { + threads.push_back (boost::thread ([&buffer, &count]() { auto done (false); for (auto i (0); !done && i < 1000; ++i) { diff --git a/rai/core_test/processor_service.cpp b/rai/core_test/processor_service.cpp index 7411fed3..2638fae0 100644 --- a/rai/core_test/processor_service.cpp +++ b/rai/core_test/processor_service.cpp @@ -60,7 +60,7 @@ TEST (alarm, one) condition.notify_one (); }); boost::asio::io_service::work work (service); - std::thread thread ([&service]() { service.run (); }); + boost::thread thread ([&service]() { service.run (); }); std::unique_lock unique (mutex); condition.wait (unique, [&]() { return !!done; }); service.stop (); @@ -83,10 +83,10 @@ TEST (alarm, many) }); } boost::asio::io_service::work work (service); - std::vector threads; + std::vector threads; for (auto i (0); i < 50; ++i) { - threads.push_back (std::thread ([&service]() { service.run (); })); + threads.push_back (boost::thread ([&service]() { service.run (); })); } std::unique_lock unique (mutex); condition.wait (unique, [&]() { return count == 50; }); @@ -116,7 +116,7 @@ TEST (alarm, top_execution) promise.set_value (false); }); boost::asio::io_service::work work (service); - std::thread thread ([&service]() { + boost::thread thread ([&service]() { service.run (); }); promise.get_future ().get (); diff --git a/rai/core_test/rpc.cpp b/rai/core_test/rpc.cpp index 8a4c9c99..f49ee85b 100644 --- a/rai/core_test/rpc.cpp +++ b/rai/core_test/rpc.cpp @@ -253,7 +253,7 @@ TEST (rpc, send) request.put ("source", rai::test_genesis_key.pub.to_account ()); request.put ("destination", rai::test_genesis_key.pub.to_account ()); request.put ("amount", "100"); - std::thread thread2 ([&system]() { + boost::thread thread2 ([&system]() { system.deadline_set (10s); while (system.nodes[0]->balance (rai::test_genesis_key.pub) == rai::genesis_amount) { @@ -288,7 +288,7 @@ TEST (rpc, send_fail) request.put ("destination", rai::test_genesis_key.pub.to_account ()); request.put ("amount", "100"); std::atomic done (false); - std::thread thread2 ([&system, &done]() { + boost::thread thread2 ([&system, &done]() { system.deadline_set (10s); while (!done) { diff --git a/rai/core_test/wallet.cpp b/rai/core_test/wallet.cpp index e8576073..39fedb8a 100644 --- a/rai/core_test/wallet.cpp +++ b/rai/core_test/wallet.cpp @@ -171,7 +171,7 @@ TEST (wallet, send_async) rai::system system (24000, 1); system.wallet (0)->insert_adhoc (rai::test_genesis_key.prv); rai::keypair key2; - std::thread thread ([&system]() { + boost::thread thread ([&system]() { system.deadline_set (10s); while (!system.nodes[0]->balance (rai::test_genesis_key.pub).is_zero ()) { diff --git a/rai/lib/interface.cpp b/rai/lib/interface.cpp index fe453dff..b540423e 100644 --- a/rai/lib/interface.cpp +++ b/rai/lib/interface.cpp @@ -125,7 +125,7 @@ char * xrb_work_transaction (const char * transaction) auto block (rai::deserialize_block_json (block_l)); if (block != nullptr) { - rai::work_pool pool (std::thread::hardware_concurrency ()); + rai::work_pool pool (boost::thread::hardware_concurrency ()); auto work (pool.generate (block->root ())); block->block_work_set (work); auto json (block->to_json ()); diff --git a/rai/lib/utility.cpp b/rai/lib/utility.cpp index 032fc682..5b02710f 100644 --- a/rai/lib/utility.cpp +++ b/rai/lib/utility.cpp @@ -69,6 +69,12 @@ namespace thread_role } } +void rai::thread_attributes::set (boost::thread::attributes & attrs) +{ + auto attrs_l (&attrs); + attrs_l->set_stack_size (8000000); //8MB +} + /* * Backing code for "release_assert", which is itself a macro */ diff --git a/rai/lib/utility.hpp b/rai/lib/utility.hpp index aee88961..73676348 100644 --- a/rai/lib/utility.hpp +++ b/rai/lib/utility.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -45,6 +46,11 @@ namespace thread_role void set_name (std::string); } +namespace thread_attributes +{ + void set (boost::thread::attributes &); +} + template class observer_set { diff --git a/rai/lib/work.cpp b/rai/lib/work.cpp index b25cbec5..30fbe3d8 100644 --- a/rai/lib/work.cpp +++ b/rai/lib/work.cpp @@ -32,10 +32,12 @@ done (false), opencl (opencl_a) { static_assert (ATOMIC_INT_LOCK_FREE == 2, "Atomic int needed"); - auto count (rai::rai_network == rai::rai_networks::rai_test_network ? 1 : std::min (max_threads_a, std::max (1u, std::thread::hardware_concurrency ()))); + boost::thread::attributes attrs; + rai::thread_attributes::set (attrs); + auto count (rai::rai_network == rai::rai_networks::rai_test_network ? 1 : std::min (max_threads_a, std::max (1u, boost::thread::hardware_concurrency ()))); for (auto i (0); i < count; ++i) { - auto thread (std::thread ([this, i]() { + auto thread (boost::thread (attrs, [this, i]() { rai::thread_role::set (rai::thread_role::name::work); rai::work_thread_reprioritize (); loop (i); diff --git a/rai/lib/work.hpp b/rai/lib/work.hpp index 133d49c8..ea17f332 100644 --- a/rai/lib/work.hpp +++ b/rai/lib/work.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -29,7 +30,7 @@ public: uint64_t generate (rai::uint256_union const &); std::atomic ticket; bool done; - std::vector threads; + std::vector threads; std::list const &)>>> pending; std::mutex mutex; std::condition_variable producer_condition; diff --git a/rai/node/bootstrap.hpp b/rai/node/bootstrap.hpp index af0990c6..5937d3bb 100644 --- a/rai/node/bootstrap.hpp +++ b/rai/node/bootstrap.hpp @@ -11,6 +11,7 @@ #include #include +#include namespace rai { @@ -189,7 +190,7 @@ private: std::mutex mutex; std::condition_variable condition; std::vector> observers; - std::thread thread; + boost::thread thread; }; class bootstrap_server; class bootstrap_listener diff --git a/rai/node/node.cpp b/rai/node/node.cpp index bf31ed12..76407c5c 100644 --- a/rai/node/node.cpp +++ b/rai/node/node.cpp @@ -43,9 +43,11 @@ resolver (node_a.service), node (node_a), on (true) { + boost::thread::attributes attrs; + rai::thread_attributes::set (attrs); for (size_t i = 0; i < node.config.network_threads; ++i) { - packet_processing_threads.push_back (std::thread ([this]() { + packet_processing_threads.push_back (boost::thread (attrs, [this]() { rai::thread_role::set (rai::thread_role::name::packet_processing); try { @@ -800,9 +802,9 @@ receive_minimum (rai::xrb_ratio), online_weight_minimum (60000 * rai::Gxrb_ratio), online_weight_quorum (50), password_fanout (1024), -io_threads (std::max (4, std::thread::hardware_concurrency ())), -network_threads (std::max (4, std::thread::hardware_concurrency ())), -work_threads (std::max (4, std::thread::hardware_concurrency ())), +io_threads (std::max (4, boost::thread::hardware_concurrency ())), +network_threads (std::max (4, boost::thread::hardware_concurrency ())), +work_threads (std::max (4, boost::thread::hardware_concurrency ())), enable_voting (true), bootstrap_connections (4), bootstrap_connections_max (64), @@ -3983,9 +3985,11 @@ int rai::node::store_version () rai::thread_runner::thread_runner (boost::asio::io_service & service_a, unsigned service_threads_a) { + boost::thread::attributes attrs; + rai::thread_attributes::set (attrs); for (auto i (0); i < service_threads_a; ++i) { - threads.push_back (std::thread ([&service_a]() { + threads.push_back (boost::thread (attrs, [&service_a]() { rai::thread_role::set (rai::thread_role::name::io); try { diff --git a/rai/node/node.hpp b/rai/node/node.hpp index 0d7fdd78..a2728096 100644 --- a/rai/node/node.hpp +++ b/rai/node/node.hpp @@ -16,6 +16,7 @@ #include #include #include +#include namespace boost { @@ -135,7 +136,7 @@ private: std::condition_variable condition; bool started; bool stopped; - std::thread thread; + boost::thread thread; }; class operation { @@ -155,7 +156,7 @@ public: std::mutex mutex; std::condition_variable condition; std::priority_queue, std::greater> operations; - std::thread thread; + boost::thread thread; }; class gap_information { @@ -430,7 +431,7 @@ public: boost::asio::ip::udp::socket socket; std::mutex socket_mutex; boost::asio::ip::udp::resolver resolver; - std::vector packet_processing_threads; + std::vector packet_processing_threads; rai::node & node; bool on; static uint16_t const node_port = rai::rai_network == rai::rai_networks::rai_live_network ? 7075 : 54000; @@ -511,7 +512,7 @@ private: bool started; bool stopped; bool active; - std::thread thread; + boost::thread thread; }; // The network is crawled for representatives by occasionally sending a unicast confirm_req for a specific block and watching to see if it's acknowledged with a vote. class rep_crawler @@ -621,7 +622,7 @@ public: rai::rep_crawler rep_crawler; unsigned warmed_up; rai::block_processor block_processor; - std::thread block_processor_thread; + boost::thread block_processor_thread; rai::block_arrival block_arrival; rai::online_reps online_reps; rai::stat stats; @@ -640,7 +641,7 @@ public: thread_runner (boost::asio::io_service &, unsigned); ~thread_runner (); void join (); - std::vector threads; + std::vector threads; }; class inactive_node { diff --git a/rai/node/wallet.hpp b/rai/node/wallet.hpp index 7b854102..4e71bdac 100644 --- a/rai/node/wallet.hpp +++ b/rai/node/wallet.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -187,7 +188,7 @@ public: rai::node & node; rai::mdb_env & env; bool stopped; - std::thread thread; + boost::thread thread; static rai::uint128_t const generate_priority; static rai::uint128_t const high_priority; diff --git a/rai/slow_test/node.cpp b/rai/slow_test/node.cpp index ed175eea..38e90bcd 100644 --- a/rai/slow_test/node.cpp +++ b/rai/slow_test/node.cpp @@ -36,7 +36,7 @@ TEST (system, generate_mass_activity_long) TEST (system, receive_while_synchronizing) { - std::vector threads; + std::vector threads; { rai::system system (24000, 1); rai::thread_runner runner (system.service, system.nodes[0]->config.io_threads); @@ -112,7 +112,7 @@ TEST (ledger, deep_account_compute) TEST (wallet, multithreaded_send) { - std::vector threads; + std::vector threads; { rai::system system (24000, 1); rai::keypair key; @@ -120,7 +120,7 @@ TEST (wallet, multithreaded_send) wallet_l->insert_adhoc (rai::test_genesis_key.prv); for (auto i (0); i < 20; ++i) { - threads.push_back (std::thread ([wallet_l, &key]() { + threads.push_back (boost::thread ([wallet_l, &key]() { for (auto i (0); i < 1000; ++i) { wallet_l->send_action (rai::test_genesis_key.pub, key.pub, 1000); @@ -141,10 +141,10 @@ TEST (wallet, multithreaded_send) TEST (store, load) { rai::system system (24000, 1); - std::vector threads; + std::vector threads; for (auto i (0); i < 100; ++i) { - threads.push_back (std::thread ([&system]() { + threads.push_back (boost::thread ([&system]() { for (auto i (0); i != 1000; ++i) { auto transaction (system.nodes[0]->store.tx_begin (true));