C++语言导学(10):并发(下)¶
引言¶
并发问题或多线程编程是一个复杂的话题,这里主要涉及线程安全数据结构。
最大线程数¶
这可以获取当前系统中可以支持的线程最大数量,但并不保证一定可以使用到这么多线程数。
过多的线程会导致系统频繁切换线程,导致效率反而降低,并不是线程数越多越好。
线程ID¶
有两种获取线程ID的方式。
等待线程结束¶
join()
只能调用一次,调用后就不是joinable()
了。
不等待线程结束¶
应为线程结束工作设定条件,否则这个守护线程(daemon thread)
会一直存在,一旦放到后台就不再是joinable()
了。
线程托管类¶
#include <iostream>
#include <thread>
using namespace std;
struct func{
int m_i;
//数据复制到线程内部,防止悬空引用
func(int i): m_i(i){}
void operator()(){
for(unsigned j=0; j < m_i; ++j){
cout << j << endl;
}
}
};
class ScopeThread{
std::thread m_t;
public:
explicit ScopeThread(std::thread t):
m_t(std::move(t))//使用移动语义
{
if(!m_t.joinable()){
throw std::logic_error("No thread");
}
}
~ScopeThread(){
m_t.join();
}
ScopeThread(ScopeThread const&)=delete; // 防止自动生成的代码引起线程所有权混乱
ScopeThread& operator=(ScopeThread const&)=delete;
};
int main(){
int i=10;
ScopeThread t{std::thread(func(i))};
return 0;
}
这是一个用类来托管线程工作的例子,这个类使用RAII原则管理线程的生命周期。
线程安全数据结构(thread-safe)¶
#include <iostream>
#include <thread>
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
using namespace std;
template<typename T>
class threadsafe_queue{
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue(){}
threadsafe_queue(threadsafe_queue const& other){
std::lock_guard<std::mutex> lk(other.mut);
data_queue = other.data_queue;
}
void push(T new_value){
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value){
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{ return !data_queue.empty();});
value = data_queue.front();
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop(){
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{ return !data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool try_pop(T& value){
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty()){
return false;
}
value = data_queue.front();
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop(){
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty()){
return std::shared_ptr<T>();
}
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool empty() const{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
int main(){
int value = 0;
threadsafe_queue<int> data;
data.push(8);
data.push(7);
data.push(10);
data.push(24);
data.wait_and_pop(value);
cout << "1: value=" << value << endl; // 8
if(data.try_pop(value)){
cout << "2: value=" << value << endl; // 7
}
auto value2 = data.wait_and_pop();
cout << "3: value2=" << *value2 << endl; // 10
std::shared_ptr<int> value3 = data.try_pop();
cout << "4: value3=" << *value3 << endl; // 24
return 0;
}
这里通过互斥量锁和条件变量来确保线程安全。
一个简单线程池¶
#include <iostream>
#include <vector>
#include <thread>
#include <atomic>
#include <functional>
#include "threadsafe_queue.hpp"
using namespace std;
// 和某个vector引用绑定,利用RAII原则释放该引用
class join_threads{
std::vector<std::thread>& m_threads;
public:
explicit join_threads(std::vector<std::thread>& threads):
m_threads(threads){}
~join_threads(){
for(unsigned long i=0; i < m_threads.size(); ++i){
if(m_threads[i].joinable()){
m_threads[i].join();
}
}
}
};
class thread_pool{
std::atomic_bool m_done; // 原子操作
threadsafe_queue<std::function<void()>> work_queue;
std::vector<std::thread> threads;
join_threads joiner; // 这个成员必须在最后声明,这样才是最后销毁
void worker_thread(){
while(!m_done){
std::function<void()> task;
if(work_queue.try_pop(task)){
task();
} else {
std::this_thread::yield();// 释放自己的线程资源
}
}
}
public:
thread_pool(): m_done(false), joiner(threads)
{
unsigned const thread_count = std::thread::hardware_concurrency();
try{
for(unsigned i=0; i < thread_count; ++i){
threads.push_back(std::thread(&thread_pool::worker_thread,this));
}
} catch(...){
m_done = true;
throw;
}
}
~thread_pool(){
m_done = true;
}
template<typename FunctionType>
void submit(FunctionType f){
work_queue.push(std::function<void()>(f));
}
};
int main(){
thread_pool pool;
pool.submit([=]{cout << "1" << endl; });
pool.submit([=]{cout << "2" << endl; });
return 0;
}
这个线程池比较简单,不会对提交的线程任务作进一步细化管理。
总结¶
互斥保护数据结构的方式是命令阻止真正的并发访问,变成串行化;但被互斥保护的范围越小、时间越短,需要的串行化操作就越少,并发程度就越高。
利用互斥和保护锁的数据结构是最简单的、线程安全的数据结构。
- 微信搜索: 「 MinYiLife 」, 关注公众号!
- 本文链接: https://www.lesliezhu.com/blog/2022/11/04/cpp_10/
- 版权声明: 原创文章,如需转载请注明文章作者和出处。谢谢!