条件变量

背景介绍

  对于一般的生产者-消费者模型,生产者会产生数据供消费者使用。生产者需要在准备好数据的时候通知消费者,消费者通过判断是否已经有数据来决定要不要消费数据。一般情况下,生产者和消费者的实现如下所示:

#include<bits/stdc++.h>
#include<mutex>
#include<unistd.h>
using namespace std;
vector<int> buffer;
mutex mtx;
bool dataIsReady = false;
void producer(){
    while (true) {
        sleep(2);
        std::unique_lock<std::mutex> lk(mtx);
        //随机生成10个数字
        for (int i = 0; i < 10; i++) {
            buffer.push_back(rand()%20);
        }
        dataIsReady = true;
    }
}
void consumer(){
    while (true) {
        sleep(2);
        std::unique_lock<std::mutex> lk(mtx);
        if (!dataIsReady) 
            continue;
        //输出10个数字之后清空数组
        for (int i = 0; i < buffer.size(); i++) {
            if (i) putchar(' ');
            cout << buffer[i];
        }
        cout << endl;
        buffer.clear();
        dataIsReady = false;
    }
}
int main(){
    srand(time(NULL));
    thread produc(producer);
    thread consum(consumer);
    produc.join();
    consum.join();
    return 0;
}

  运行结果如下所示:

14 8 6 13 18 2 16 5 3 14
10 14 4 0 15 12 8 13 1 3
17 0 15 5 4 6 1 4 17 11
0 3 19 6 17 18 8 5 15 4
0 5 10 16 18 18 1 6 11 2
10 0 15 17 18 11 4 11 15 13
2 15 9 14 13 18 12 14 3 19
18 3 17 0 12 15 18 13 13 1
7 3 2 14 13 0 18 9 3 13
...

  这种写法可以正常工作,但是存在一个问题:每次consumer线程都会循环判断数据是否准备好,这个过程中线程不会让出资源,因此循环判断会带来不必要的开销。正因如此,才需要学习条件变量(condition_variable)。

条件变量

  条件变量(condition_variable)是C++中的一个类,能够阻塞线程直到相关条件满足时被唤醒。条件变量使用wait函数阻塞线程,wait函数一般与unique_lock配合使用。直到有其他线程中的同一个条件变量调用了唤醒函数,当前被wait阻塞的线程才会被唤醒。使用条件变量可以再相关条件不满足时阻塞线程以让出资源,防止因为不断的循环加锁判定带来的开销。
  除了构造函数和析构函数以外,condition_variable类的成员函数有wait函数和notify函数:

wait函数

  wait函数有三个,分别为wait、wait_for、wait_until,这里着重介绍一下wait方法:
  wait函数有以下两个版本:

类型 函数原型
unconditional (1) void wait (unique_lock<mutex>& lck);
predicate (2) template<class Predicate>
void wait (unique_lock<mutex>& lck, Predicate pred);

  (1)参数lck
  一个加了锁的unique_lock对象,所有调用了当前条件变量wait方法的对象都应该在下层使用同一个mutex对象。
  (2)参数pred
  一个可以调用的对象或者函数,函数或者对象没有参数并且需要返回一个bool类型的值,线程将会不停的调用wait函数直到该返回值为true;
  对于以上两个版本的函数,其使用效果如下所示,且以下示例等价:

std::mutex mtx;
std::condition_variable cv;
bool isReady;
//写法一
while(true) {
    sleep(1);
    std::unique_lock<std::mutext> lk(mtx);
    while (!isReady) {
        cv.wait(lk);
    }
}
//写法二
while (true) {
    sleep(1);
    std::unique_lock<std::mutext> lk(mtx);
    cv.wait(lk, []{return isReady;});
}

  使用lambda函数更显简洁,因此建议第二种写法。

notify函数

  notify函数有两个,分别为notify_one和notify_all。
  (1)notify_one
  唤醒一个满足该条件变量wait函数的线程,如果没有线程处于等待状态,那么此函数不做任何事。如果等待的线程有多个,那么随机选择一个线程唤醒。
  (2)notify_all
  唤醒所有满足该条件变量wait函数的线程,如果没有线程处于等待状态,那么此函数不做任何事。
  notify函数的示例如下所示:

// condition_variable::notify_all
#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void print_id (int id) {
    std::unique_lock<std::mutex> lck(mtx);
    while (!ready) cv.wait(lck);
    // ...
    std::cout << "thread " << id << '\n';
}

void go() {
    //修改ready标记,并通知打印线程工作
    std::unique_lock<std::mutex> lck(mtx);
    ready = true;
    cv.notify_all();
}

int main (){
    std::thread threads[10];
    // 创建10个线程,每个线程当ready标记为真时打印自己的id号
    for (int i=0; i<10; ++i)
        threads[i] = std::thread(print_id,i);

    std::cout << "10 threads ready to race...\n";
    go();                       // go!

    for (auto& th : threads) th.join();

    return 0;
}

  执行结果如下所示:

10 threads ready to race...
thread 0
thread 1
thread 2
thread 3
thread 4
thread 5
thread 6
thread 7
thread 8
thread 9

  程序在调用了go()函数后,修改了ready标记,然后通知其他线程工作,其他线程便不再阻塞,完成打印工作。

条件变量的使用

  熟悉了条件变量的常用变量及其使用方法,那么我们可以用来实现常见的模型,比如生产者-消费者模型。

生产者-消费者模型

  生产者-消费者模型中生产者负责生产数据,消费者负责消费数据。这里使用5个生产者和5个消费者对一个buff进行操作,这个buffer使用队列存储数据,生产者生产的数据按照顺序存入,消费者在buffer不空时从队列头部取出数据。为了防止生产者产生太多的数据,这里给buffer添加了容量限制。
  生产者-消费者模型的结构如下所示:

  其代码实现如下所示:

#include<bits/stdc++.h>
#include<unistd.h>
#include<mutex>
#include<condition_variable>
using namespace std;

std::mutex mtx;                 //mutex变量
std::condition_variable cv;     //condition_variable变量
std::queue<int> databuffer;     //数据buffer
//队列的最大容量
const int MAXSIZE = 100;
void producer(){
    while (true) {
        sleep(5);
        //获取锁
        std::unique_lock<std::mutex> lk(mtx);
        //等待,直到数据不满则生产数据
        cv.wait(lk,[]{return databuffer.size() < MAXSIZE;});
        int val = rand();
        databuffer.push(val);
        cout << "[ Producer Thread ]-Thread_id: " << std::this_thread::get_id() << " Push " << val << endl;
        //生产数据通知其他线程
        cv.notify_all();
    }
}
void consumer(){
    while (true) {
        sleep(5);
        //带参数的构造函数 创建之后立即加锁,销毁对象时会自动解锁,所以定义在局部使用会比较方便
        std::unique_lock<std::mutex> lk(mtx);
        //等待,直到数据buffer不空时消费数据
        cv.wait(lk,[]{return !databuffer.empty();});

        int val = databuffer.front();
        databuffer.pop();
        cout << "[ Consumer Thread ]-Thread_id: " << std::this_thread::get_id() << " Pop " << val << endl;
        //消费数据后通知其他线程
        cv.notify_all();
    }
}
int main(){
    srand(time(nullptr));
    vector<thread> producers;
    vector<thread> consumers;
    //创建线程
    for (int i =0 ; i < 5; i++) {
        producers.push_back(thread(producer));
        consumers.push_back(thread(consumer));
    }
    for (int i =0 ; i < 5; i++) {
        producers[i].join();
        consumers[i].join();
    }    
    return 0;
}

  部分运行结果如下所示:

[ Producer Thread ]-Thread_id: 140737348204288 Push 1064460442
[ Consumer Thread ]-Thread_id: 140737339811584 Pop 1064460442
[ Producer Thread ]-Thread_id: 140737331418880 Push 1572334329
[ Consumer Thread ]-Thread_id: 140737323026176 Pop 1572334329
[ Producer Thread ]-Thread_id: 140737314633472 Push 53770771
[ Consumer Thread ]-Thread_id: 140737306240768 Pop 53770771
[ Producer Thread ]-Thread_id: 140737297848064 Push 1843725020
[ Consumer Thread ]-Thread_id: 140737289455360 Pop 1843725020
[ Producer Thread ]-Thread_id: 140737281062656 Push 1810443650
[ Consumer Thread ]-Thread_id: 140737272669952 Pop 1810443650
[ Producer Thread ]-Thread_id: 140737348204288 Push 165477773
[ Consumer Thread ]-Thread_id: 140737339811584 Pop 165477773
[ Producer Thread ]-Thread_id: 140737331418880 Push 24357394
[ Producer Thread ]-Thread_id: 140737314633472 Push 2070814110
[ Consumer Thread ]-Thread_id: 140737323026176 Pop 24357394
[ Consumer Thread ]-Thread_id: 140737306240768 Pop 2070814110
[ Producer Thread ]-Thread_id: 140737297848064 Push 1358644763
...

  参考博客C++11条件变量使用详解.

父母子女-水果问题

  问题描述:
  父亲、母亲分别向一个果盘中放置一个水果。父亲放置苹果,母亲放置橘子。儿子专门等待果盘中的苹果。女儿专门等待果盘中的橘子。当果盘中准备好水果以后,儿子和女儿分别根据自己的需要拿走水果。
  这其实也是一个生产者和消费者的模型,使用条件变量实现如下:

#include<bits/stdc++.h>
#include<mutex>
#include<unistd.h>
using namespace std;
mutex plate_mtx;                //盘子互斥量
condition_variable plate_cv;    //条件通知
//三个条件标记
bool appleIsReady = false;      //苹果已准备好
bool orangeIsReady = false;     //橘子已准备好
bool plateIsEmpty = true;       //盘子已经空了

void father() {
    int cnt = 1;
    while (true) {
        sleep(1);
        unique_lock<mutex> lck(plate_mtx);
        plate_cv.wait(lck, []{return plateIsEmpty;});
        //如果盘子不空,则准备苹果
        cout << "[Father] : I prepared my " << cnt;
        switch (cnt) {
            case 1 : cout << "st ";break;
            case 2 : cout << "nd ";break;
            case 3 : cout << "rd ";break;
            default: cout << "th ";break;
        }
        cnt++;
        cout << "apple." << endl;
        //修改盘子标记和苹果标记
        plateIsEmpty = false;
        appleIsReady = true;
        plate_cv.notify_all();
    }
}
void mother() {
    int cnt = 1;
    while (true) {
        sleep(1);
        unique_lock<mutex> lck(plate_mtx);
        plate_cv.wait(lck, []{return plateIsEmpty;});
        //如果盘子不空则准备橘子
        cout << "\t[Mother] : I prepared my " << cnt;
        switch (cnt) {
            case 1 : cout << "st ";break;
            case 2 : cout << "nd ";break;
            case 3 : cout << "rd ";break;
            default: cout << "th ";break;
        }
        cnt++;
        cout << "orange." << endl;
        //修改盘子标记和橘子标记
        plateIsEmpty = false;
        orangeIsReady = true;
        plate_cv.notify_all();
    }
}
void son() {
    while (true) {
        sleep(1);
        unique_lock<mutex> lck(plate_mtx);
        plate_cv.wait(lck, []{return !plateIsEmpty && appleIsReady;});
        //当盘子不空且苹果已经准备好的情况下拿苹果,并修改标记
        cout << "\t\t[Son] : I get an apple! Thank you, dad!" << endl;
        plateIsEmpty = true;
        appleIsReady = false;
        plate_cv.notify_all();
    }
}
void daughter() {
    while (true) {
        sleep(1);
        unique_lock<mutex> lck(plate_mtx);
        plate_cv.wait(lck, []{return !plateIsEmpty && orangeIsReady;});
        //当盘子不空且橘子已经准备好的情况下拿橘子,并修改标记
        cout << "\t\t\t[daughter] : I get an orange! Thank you, mom!" << endl;
        plateIsEmpty = true;
        orangeIsReady = false;
        plate_cv.notify_all();
    }
}

int main() {
    //创建线程
    thread father_thread(father);
    thread mother_thread(mother);
    thread son_thread(son);
    thread daughter_thread(daughter);
    //join所有线程
    father_thread.join();
    mother_thread.join();
    son_thread.join();
    daughter_thread.join();

    return 0;
}

  部分执行结果如下所示:


[Father] : I prepared my 1st apple.
                [Son] : I get an apple! Thank you, dad!
        [Mother] : I prepared my 1st orange.
                        [daughter] : I get an orange! Thank you, mom!
[Father] : I prepared my 2nd apple.
                [Son] : I get an apple! Thank you, dad!
        [Mother] : I prepared my 2nd orange.
                        [daughter] : I get an orange! Thank you, mom!
[Father] : I prepared my 3rd apple.
                [Son] : I get an apple! Thank you, dad!
        [Mother] : I prepared my 3rd orange.
                        [daughter] : I get an orange! Thank you, mom!
[Father] : I prepared my 4th apple.
                [Son] : I get an apple! Thank you, dad!
        [Mother] : I prepared my 4th orange.
                        [daughter] : I get an orange! Thank you, mom!
[Father] : I prepared my 5th apple.
                [Son] : I get an apple! Thank you, dad!
        [Mother] : I prepared my 5th orange.
                        [daughter] : I get an orange! Thank you, mom!
···

当珍惜每一片时光~