converting to boost threads and setting 8MB thread stacks (#1289)

converting to boost threads and setting 8MB thread stack size universally
This commit is contained in:
Russel Waters 2018-10-10 19:40:01 -04:00 committed by Roy Keene
commit e1ce480ebb
14 changed files with 55 additions and 33 deletions

View file

@ -1013,7 +1013,7 @@ TEST (udp_buffer, one_buffer_multithreaded)
{
rai::stat stats;
rai::udp_buffer buffer (stats, 512, 1);
std::thread thread ([&buffer]() {
boost::thread thread ([&buffer]() {
auto done (false);
while (!done)
{
@ -1038,10 +1038,10 @@ TEST (udp_buffer, many_buffers_multithreaded)
{
rai::stat stats;
rai::udp_buffer buffer (stats, 512, 16);
std::vector<std::thread> threads;
std::vector<boost::thread> threads;
for (auto i (0); i < 4; ++i)
{
threads.push_back (std::thread ([&buffer]() {
threads.push_back (boost::thread ([&buffer]() {
auto done (false);
while (!done)
{
@ -1057,7 +1057,7 @@ TEST (udp_buffer, many_buffers_multithreaded)
std::atomic_int count (0);
for (auto i (0); i < 4; ++i)
{
threads.push_back (std::thread ([&buffer, &count]() {
threads.push_back (boost::thread ([&buffer, &count]() {
auto done (false);
for (auto i (0); !done && i < 1000; ++i)
{

View file

@ -60,7 +60,7 @@ TEST (alarm, one)
condition.notify_one ();
});
boost::asio::io_service::work work (service);
std::thread thread ([&service]() { service.run (); });
boost::thread thread ([&service]() { service.run (); });
std::unique_lock<std::mutex> unique (mutex);
condition.wait (unique, [&]() { return !!done; });
service.stop ();
@ -83,10 +83,10 @@ TEST (alarm, many)
});
}
boost::asio::io_service::work work (service);
std::vector<std::thread> threads;
std::vector<boost::thread> threads;
for (auto i (0); i < 50; ++i)
{
threads.push_back (std::thread ([&service]() { service.run (); }));
threads.push_back (boost::thread ([&service]() { service.run (); }));
}
std::unique_lock<std::mutex> unique (mutex);
condition.wait (unique, [&]() { return count == 50; });
@ -116,7 +116,7 @@ TEST (alarm, top_execution)
promise.set_value (false);
});
boost::asio::io_service::work work (service);
std::thread thread ([&service]() {
boost::thread thread ([&service]() {
service.run ();
});
promise.get_future ().get ();

View file

@ -253,7 +253,7 @@ TEST (rpc, send)
request.put ("source", rai::test_genesis_key.pub.to_account ());
request.put ("destination", rai::test_genesis_key.pub.to_account ());
request.put ("amount", "100");
std::thread thread2 ([&system]() {
boost::thread thread2 ([&system]() {
system.deadline_set (10s);
while (system.nodes[0]->balance (rai::test_genesis_key.pub) == rai::genesis_amount)
{
@ -288,7 +288,7 @@ TEST (rpc, send_fail)
request.put ("destination", rai::test_genesis_key.pub.to_account ());
request.put ("amount", "100");
std::atomic<bool> done (false);
std::thread thread2 ([&system, &done]() {
boost::thread thread2 ([&system, &done]() {
system.deadline_set (10s);
while (!done)
{

View file

@ -171,7 +171,7 @@ TEST (wallet, send_async)
rai::system system (24000, 1);
system.wallet (0)->insert_adhoc (rai::test_genesis_key.prv);
rai::keypair key2;
std::thread thread ([&system]() {
boost::thread thread ([&system]() {
system.deadline_set (10s);
while (!system.nodes[0]->balance (rai::test_genesis_key.pub).is_zero ())
{

View file

@ -125,7 +125,7 @@ char * xrb_work_transaction (const char * transaction)
auto block (rai::deserialize_block_json (block_l));
if (block != nullptr)
{
rai::work_pool pool (std::thread::hardware_concurrency ());
rai::work_pool pool (boost::thread::hardware_concurrency ());
auto work (pool.generate (block->root ()));
block->block_work_set (work);
auto json (block->to_json ());

View file

@ -69,6 +69,12 @@ namespace thread_role
}
}
void rai::thread_attributes::set (boost::thread::attributes & attrs)
{
auto attrs_l (&attrs);
attrs_l->set_stack_size (8000000); //8MB
}
/*
* Backing code for "release_assert", which is itself a macro
*/

View file

@ -2,6 +2,7 @@
#include <boost/filesystem.hpp>
#include <boost/system/error_code.hpp>
#include <boost/thread/thread.hpp>
#include <functional>
#include <mutex>
@ -45,6 +46,11 @@ namespace thread_role
void set_name (std::string);
}
namespace thread_attributes
{
void set (boost::thread::attributes &);
}
template <typename... T>
class observer_set
{

View file

@ -32,10 +32,12 @@ done (false),
opencl (opencl_a)
{
static_assert (ATOMIC_INT_LOCK_FREE == 2, "Atomic int needed");
auto count (rai::rai_network == rai::rai_networks::rai_test_network ? 1 : std::min (max_threads_a, std::max (1u, std::thread::hardware_concurrency ())));
boost::thread::attributes attrs;
rai::thread_attributes::set (attrs);
auto count (rai::rai_network == rai::rai_networks::rai_test_network ? 1 : std::min (max_threads_a, std::max (1u, boost::thread::hardware_concurrency ())));
for (auto i (0); i < count; ++i)
{
auto thread (std::thread ([this, i]() {
auto thread (boost::thread (attrs, [this, i]() {
rai::thread_role::set (rai::thread_role::name::work);
rai::work_thread_reprioritize ();
loop (i);

View file

@ -1,6 +1,7 @@
#pragma once
#include <boost/optional.hpp>
#include <boost/thread/thread.hpp>
#include <rai/lib/config.hpp>
#include <rai/lib/numbers.hpp>
#include <rai/lib/utility.hpp>
@ -29,7 +30,7 @@ public:
uint64_t generate (rai::uint256_union const &);
std::atomic<int> ticket;
bool done;
std::vector<std::thread> threads;
std::vector<boost::thread> threads;
std::list<std::pair<rai::uint256_union, std::function<void(boost::optional<uint64_t> const &)>>> pending;
std::mutex mutex;
std::condition_variable producer_condition;

View file

@ -11,6 +11,7 @@
#include <unordered_set>
#include <boost/log/sources/logger.hpp>
#include <boost/thread/thread.hpp>
namespace rai
{
@ -189,7 +190,7 @@ private:
std::mutex mutex;
std::condition_variable condition;
std::vector<std::function<void(bool)>> observers;
std::thread thread;
boost::thread thread;
};
class bootstrap_server;
class bootstrap_listener

View file

@ -43,9 +43,11 @@ resolver (node_a.service),
node (node_a),
on (true)
{
boost::thread::attributes attrs;
rai::thread_attributes::set (attrs);
for (size_t i = 0; i < node.config.network_threads; ++i)
{
packet_processing_threads.push_back (std::thread ([this]() {
packet_processing_threads.push_back (boost::thread (attrs, [this]() {
rai::thread_role::set (rai::thread_role::name::packet_processing);
try
{
@ -800,9 +802,9 @@ receive_minimum (rai::xrb_ratio),
online_weight_minimum (60000 * rai::Gxrb_ratio),
online_weight_quorum (50),
password_fanout (1024),
io_threads (std::max<unsigned> (4, std::thread::hardware_concurrency ())),
network_threads (std::max<unsigned> (4, std::thread::hardware_concurrency ())),
work_threads (std::max<unsigned> (4, std::thread::hardware_concurrency ())),
io_threads (std::max<unsigned> (4, boost::thread::hardware_concurrency ())),
network_threads (std::max<unsigned> (4, boost::thread::hardware_concurrency ())),
work_threads (std::max<unsigned> (4, boost::thread::hardware_concurrency ())),
enable_voting (true),
bootstrap_connections (4),
bootstrap_connections_max (64),
@ -3983,9 +3985,11 @@ int rai::node::store_version ()
rai::thread_runner::thread_runner (boost::asio::io_service & service_a, unsigned service_threads_a)
{
boost::thread::attributes attrs;
rai::thread_attributes::set (attrs);
for (auto i (0); i < service_threads_a; ++i)
{
threads.push_back (std::thread ([&service_a]() {
threads.push_back (boost::thread (attrs, [&service_a]() {
rai::thread_role::set (rai::thread_role::name::io);
try
{

View file

@ -16,6 +16,7 @@
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/random_access_index.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/thread/thread.hpp>
namespace boost
{
@ -135,7 +136,7 @@ private:
std::condition_variable condition;
bool started;
bool stopped;
std::thread thread;
boost::thread thread;
};
class operation
{
@ -155,7 +156,7 @@ public:
std::mutex mutex;
std::condition_variable condition;
std::priority_queue<operation, std::vector<operation>, std::greater<operation>> operations;
std::thread thread;
boost::thread thread;
};
class gap_information
{
@ -430,7 +431,7 @@ public:
boost::asio::ip::udp::socket socket;
std::mutex socket_mutex;
boost::asio::ip::udp::resolver resolver;
std::vector<std::thread> packet_processing_threads;
std::vector<boost::thread> packet_processing_threads;
rai::node & node;
bool on;
static uint16_t const node_port = rai::rai_network == rai::rai_networks::rai_live_network ? 7075 : 54000;
@ -511,7 +512,7 @@ private:
bool started;
bool stopped;
bool active;
std::thread thread;
boost::thread thread;
};
// The network is crawled for representatives by occasionally sending a unicast confirm_req for a specific block and watching to see if it's acknowledged with a vote.
class rep_crawler
@ -621,7 +622,7 @@ public:
rai::rep_crawler rep_crawler;
unsigned warmed_up;
rai::block_processor block_processor;
std::thread block_processor_thread;
boost::thread block_processor_thread;
rai::block_arrival block_arrival;
rai::online_reps online_reps;
rai::stat stats;
@ -640,7 +641,7 @@ public:
thread_runner (boost::asio::io_service &, unsigned);
~thread_runner ();
void join ();
std::vector<std::thread> threads;
std::vector<boost::thread> threads;
};
class inactive_node
{

View file

@ -1,5 +1,6 @@
#pragma once
#include <boost/thread/thread.hpp>
#include <rai/node/common.hpp>
#include <rai/node/lmdb.hpp>
#include <rai/node/openclwork.hpp>
@ -187,7 +188,7 @@ public:
rai::node & node;
rai::mdb_env & env;
bool stopped;
std::thread thread;
boost::thread thread;
static rai::uint128_t const generate_priority;
static rai::uint128_t const high_priority;

View file

@ -36,7 +36,7 @@ TEST (system, generate_mass_activity_long)
TEST (system, receive_while_synchronizing)
{
std::vector<std::thread> threads;
std::vector<boost::thread> threads;
{
rai::system system (24000, 1);
rai::thread_runner runner (system.service, system.nodes[0]->config.io_threads);
@ -112,7 +112,7 @@ TEST (ledger, deep_account_compute)
TEST (wallet, multithreaded_send)
{
std::vector<std::thread> threads;
std::vector<boost::thread> threads;
{
rai::system system (24000, 1);
rai::keypair key;
@ -120,7 +120,7 @@ TEST (wallet, multithreaded_send)
wallet_l->insert_adhoc (rai::test_genesis_key.prv);
for (auto i (0); i < 20; ++i)
{
threads.push_back (std::thread ([wallet_l, &key]() {
threads.push_back (boost::thread ([wallet_l, &key]() {
for (auto i (0); i < 1000; ++i)
{
wallet_l->send_action (rai::test_genesis_key.pub, key.pub, 1000);
@ -141,10 +141,10 @@ TEST (wallet, multithreaded_send)
TEST (store, load)
{
rai::system system (24000, 1);
std::vector<std::thread> threads;
std::vector<boost::thread> threads;
for (auto i (0); i < 100; ++i)
{
threads.push_back (std::thread ([&system]() {
threads.push_back (boost::thread ([&system]() {
for (auto i (0); i != 1000; ++i)
{
auto transaction (system.nodes[0]->store.tx_begin (true));