C++语言导学(9): 并发(上)¶
引言¶
多线程程序常用来提高速度和吞吐量,但数据同步与共享等问题让多线程开发变得复杂,有时候过多的控制底层细节往往难以防范错误的发生。
线程: thread¶
可与其它计算并行执行的计算称为任务(task)
, 线程(thread)
是任务在程序中的系统级表示。
#include <thread>
void f(); // 函数
struct F // 函数对象
{
void operator()();
};
int main()
{
std::thread t1{f}; // f()在独立线程中执行
std::thread t2{F()}; // F()()在独立线程中执行
t1.join(); // 等待t1线程结束
t2.join(); // 等待t2线程结束
}
join
是等待一个线程结束;一个程序的所有线程共享单一的地址空间。
编译的时候加上参数-lpthread
,底层是使用pthread
来实现线程的。
线程参数¶
比如:
#include <iostream>
#include <vector>
#include <thread>
void f(std::vector<double>& v)
{
v.push_back(100);
}
struct F{
std::vector<double> v;
F(std::vector<double> vv): v(vv){}
void operator()()
{
v.push_back(100);
}
};
int main()
{
std::vector<double> vec1{1,2,3,4,5,6,7,8,9};
std::vector<double> vec2{10,11,12,13,14};
std::cout << "vec1 size: " << vec1.size() << std::endl; // 9
std::cout << "vec2 size: " << vec2.size() << std::endl; // 5
std::thread t1{f,std::ref(vec1)}; // 引用传递
std::thread t2{F{vec2}}; // 值传递
t1.join();
t2.join();
std::cout << "vec1 size: " << vec1.size() << std::endl; // 10
std::cout << "vec2 size: " << vec2.size() << std::endl; // 5
}
如果参数是引用传递给线程,需要注意线程运行处理期间引用的值可能被其它程序逻辑改变,否则使用const
和std::cref
明确不可修改。
如果要线程返回结果,参数是引用传递的话就自动改变了引用的内容,等于是一种返回结果的方式;也可以传递第二个引用参数,以比较明确的方式通过引用返回结果,即调用-返回模式
。
共享数据: 互斥锁¶
比如:
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
std::mutex m1,m2,m3; // 控制共享访问的互斥量
int sh=0;
void f(const std::string& s)
{
sh += 7;
std::cout << s << ", sh=" << sh << std::endl;
}
int main()
{
std::thread t1{f,"t1"};
std::thread t2{f,"t2"};
t1.join();
t2.join();
}
在函数f中有两个问题,首先cout不是线程安全的,所以线程t1和t2的输出是顺序混乱的;另外,对于sh的访问可能会出现同时访问的情况。
将cout换成printf可以确保线程安全输出:
而对于全局变量sh的访问,避免出现同时访问情况需要加互斥量锁:
void f(const std::string& s)
{
std::scoped_lock lck{m1,m2,m3}; // 互斥量锁,否则等待
sh += 7;
std::cout << s << ", sh=" << sh << std::endl;
// 离开作用域,隐式释放互斥量锁
}
因为有了锁,线程不安全的cout也不会出现输出混乱的情况了;这里是需要同时拿到3个互斥量锁,否则就等待。
加锁-释放锁模式
的代价很高,可能并一定比调用-返回模式
更高效。
共享数据: 共享锁¶
std::shared_mutex mx; // 可以共享的互斥量
void reader()
{
std::shared_lock lck{mx}; // 共享锁,共同访问
//...读...
}
void writer()
{
std::unique_lock lck{mx}; // 互斥锁,唯一访问
//...写...
}
等待事件: 条件变量¶
用生产者消费者模型来举例。
#include <thread>
#include <mutex>
#include <condition_variable>
class Message{ // 通信的对象
//...
};
queue<Message> mqueue; // 消息队列
condition_variable mcond; // 条件变量
mutex mmutex; // 互斥锁
消费者:
void consumer()
{
while(true){
unique_lock lck{mmutex};
// 条件变量释放获取的互斥锁
// 接到通知后会自动检查消息队列是否非空
// 一旦非空则自动重新获取锁并进一步处理
mcond.wait(lck, []{ return !mqueue.empty(); });
auto m = mqueue.front(); // 获取消息实体
mqueue.pop();
lck.unlock(); // 及时释放锁
//...处理数据...
}
}
生产者:
void producer()
{
while(true){
Message m;
//...m data...
scoped_lock lck{mmutex}; // 互斥锁保证消息队列操作是唯一的
mqueue.push(m);
mcond.notify_one(); // 通知条件变量去检查自己的唤醒条件是否符合
// 隐式释放互斥锁
}
}
任务通信¶
在task层次通信,而不是底层的基于锁的通信,主要有这些方法<future>
:
- future, promise: 期待任务返回值和承诺生成任务需要的值
- package_task: 简化future和promise连接操作
- async: 异步计算
async例子:
#include <iostream>
#include <vector>
#include <future>
#include <numeric>
using namespace std;
double accum(double* begin, double* end, double init)
{
return accumulate(begin, end, init);
}
double comp(vector<double>& v)
{
if(v.size() < 10000)
{
return accumulate(v.begin(), v.end(), 0.0);
}
auto v0 = &v[0];
auto sz = v.size();
auto f0 = async(accum,v0, v0+sz/4, 0.0);
auto f1 = async(accum,v0+sz/4, v0+sz/2, 0.0);
auto f2 = async(accum,v0+sz/2, v0+sz*3/4, 0.0);
auto f3 = async(accum,v0+sz*3/4, v0+sz, 0.0);
return f0.get() + f1.get() + f2.get() + f3.get(); // 收集并组合结果
}
int main()
{
int N = 40000;
vector<double> v(N);
for(auto i=0; i < N; i++)
{
v.push_back(i);
}
auto sum = comp(v);
cout << "sum of v is " << sum << endl;
}
在这些方法中不再掺杂锁等操作,而是交由底层通信机制自动来处理,只关注任务的返回结果;但很多时间多线程开发并不是简单的只关注结果,目前看来这些方法比较适合简单的并发任务。
- 微信搜索: 「 MinYiLife 」, 关注公众号!
- 本文链接: https://www.lesliezhu.com/blog/2022/11/02/cpp_9/
- 版权声明: 原创文章,如需转载请注明文章作者和出处。谢谢!