Skip to content

C++语言导学(9): 并发(上)

引言

多线程程序常用来提高速度和吞吐量,但数据同步与共享等问题让多线程开发变得复杂,有时候过多的控制底层细节往往难以防范错误的发生。

线程: thread

可与其它计算并行执行的计算称为任务(task), 线程(thread)是任务在程序中的系统级表示。

#include <thread>

void f();                   // 函数

struct F                    // 函数对象
{
    void operator()();
};

int main()
{
    std::thread t1{f};     // f()在独立线程中执行
    std::thread t2{F()};   // F()()在独立线程中执行

    t1.join(); // 等待t1线程结束
    t2.join(); // 等待t2线程结束
}

join是等待一个线程结束;一个程序的所有线程共享单一的地址空间。

编译的时候加上参数-lpthread,底层是使用pthread来实现线程的。

线程参数

比如:

#include <iostream>
#include <vector>
#include <thread>

void f(std::vector<double>& v)
{
    v.push_back(100);
}

struct F{
    std::vector<double> v;
    F(std::vector<double> vv): v(vv){}
    void operator()()
    {
        v.push_back(100);
    }
};

int main()
{
    std::vector<double> vec1{1,2,3,4,5,6,7,8,9};
    std::vector<double> vec2{10,11,12,13,14};

    std::cout << "vec1 size: " << vec1.size() << std::endl;   // 9
    std::cout << "vec2 size: " << vec2.size() << std::endl;   // 5

    std::thread t1{f,std::ref(vec1)};   // 引用传递
    std::thread t2{F{vec2}};            // 值传递

    t1.join();
    t2.join();

    std::cout << "vec1 size: " << vec1.size() << std::endl;  // 10
    std::cout << "vec2 size: " << vec2.size() << std::endl;  // 5
}

如果参数是引用传递给线程,需要注意线程运行处理期间引用的值可能被其它程序逻辑改变,否则使用conststd::cref明确不可修改。

如果要线程返回结果,参数是引用传递的话就自动改变了引用的内容,等于是一种返回结果的方式;也可以传递第二个引用参数,以比较明确的方式通过引用返回结果,即调用-返回模式

共享数据: 互斥锁

比如:

#include <iostream>
#include <string>
#include <thread>
#include <mutex>

std::mutex m1,m2,m3; // 控制共享访问的互斥量

int sh=0;

void f(const std::string& s)
{
    sh += 7;
    std::cout << s << ", sh=" << sh << std::endl;
}

int main()
{
    std::thread t1{f,"t1"};
    std::thread t2{f,"t2"};

    t1.join();
    t2.join();
}

在函数f中有两个问题,首先cout不是线程安全的,所以线程t1和t2的输出是顺序混乱的;另外,对于sh的访问可能会出现同时访问的情况。

将cout换成printf可以确保线程安全输出:

void f(const std::string& s)
{
    sh += 7;
    printf("%s, sh=%d\n", s.c_str(), sh);
}

而对于全局变量sh的访问,避免出现同时访问情况需要加互斥量锁:

void f(const std::string& s)
{
    std::scoped_lock lck{m1,m2,m3};    // 互斥量锁,否则等待
    sh += 7;
    std::cout << s << ", sh=" << sh << std::endl;
    // 离开作用域,隐式释放互斥量锁
}

因为有了锁,线程不安全的cout也不会出现输出混乱的情况了;这里是需要同时拿到3个互斥量锁,否则就等待。

加锁-释放锁模式的代价很高,可能并一定比调用-返回模式更高效。

共享数据: 共享锁

std::shared_mutex mx;    // 可以共享的互斥量

void reader()
{
    std::shared_lock lck{mx};   // 共享锁,共同访问
    //...读...
}

void writer()
{
    std::unique_lock lck{mx};  // 互斥锁,唯一访问
    //...写...
}

等待事件: 条件变量

用生产者消费者模型来举例。

#include <thread>
#include <mutex>
#include <condition_variable>

class Message{  // 通信的对象
    //...
};

queue<Message> mqueue;   // 消息队列
condition_variable mcond; //  条件变量
mutex mmutex;   // 互斥锁

消费者:

void consumer()
{
    while(true){
        unique_lock lck{mmutex};

        // 条件变量释放获取的互斥锁
        // 接到通知后会自动检查消息队列是否非空
        // 一旦非空则自动重新获取锁并进一步处理
        mcond.wait(lck, []{ return !mqueue.empty(); });

        auto m = mqueue.front(); // 获取消息实体
        mqueue.pop();

        lck.unlock();  // 及时释放锁

        //...处理数据...
    }
}

生产者:

void producer()
{
    while(true){
        Message m;
        //...m data...

        scoped_lock lck{mmutex};  // 互斥锁保证消息队列操作是唯一的
        mqueue.push(m);

        mcond.notify_one(); // 通知条件变量去检查自己的唤醒条件是否符合

        // 隐式释放互斥锁
    }
}

任务通信

在task层次通信,而不是底层的基于锁的通信,主要有这些方法<future>:

  • future, promise: 期待任务返回值和承诺生成任务需要的值
  • package_task: 简化future和promise连接操作
  • async: 异步计算

async例子:

#include <iostream>
#include <vector>
#include <future>
#include <numeric>

using namespace std;

double accum(double* begin, double* end, double init)
{
    return accumulate(begin, end, init);
}

double comp(vector<double>& v)
{
    if(v.size() < 10000)
    {
        return accumulate(v.begin(), v.end(), 0.0);
    }

    auto v0 = &v[0];
    auto sz = v.size();

    auto f0 = async(accum,v0,        v0+sz/4,   0.0);
    auto f1 = async(accum,v0+sz/4,   v0+sz/2,   0.0);
    auto f2 = async(accum,v0+sz/2,   v0+sz*3/4, 0.0);
    auto f3 = async(accum,v0+sz*3/4, v0+sz,     0.0);

    return f0.get() + f1.get() + f2.get() + f3.get(); // 收集并组合结果
}

int main()
{
    int N = 40000;
    vector<double> v(N);
    for(auto i=0; i < N; i++)
    {
        v.push_back(i);
    }

    auto sum = comp(v);
    cout << "sum of v is " << sum << endl;
}

在这些方法中不再掺杂锁等操作,而是交由底层通信机制自动来处理,只关注任务的返回结果;但很多时间多线程开发并不是简单的只关注结果,目前看来这些方法比较适合简单的并发任务。