条件变量

背景介绍

  对于一般的生产者-消费者模型,生产者会产生数据供消费者使用。生产者需要在准备好数据的时候通知消费者,消费者通过判断是否已经有数据来决定要不要消费数据。一般情况下,生产者和消费者的实现如下所示:

#include<bits/stdc++.h>
#include<mutex>
#include<unistd.h>
using namespace std;
vector<int> buffer;
mutex mtx;
bool dataIsReady = false;
void producer(){
while (true) {
sleep(2);
std::unique_lock<std::mutex> lk(mtx);
//随机生成10个数字
for (int i = 0; i < 10; i++) {
buffer.push_back(rand()%20);
}
dataIsReady = true;
}
}
void consumer(){
while (true) {
sleep(2);
std::unique_lock<std::mutex> lk(mtx);
if (!dataIsReady)
continue;
//输出10个数字之后清空数组
for (int i = 0; i < buffer.size(); i++) {
if (i) putchar(' ');
cout << buffer[i];
}
cout << endl;
buffer.clear();
dataIsReady = false;
}
}
int main(){
srand(time(NULL));
thread produc(producer);
thread consum(consumer);
produc.join();
consum.join();
return 0;
}

  运行结果如下所示:

14 8 6 13 18 2 16 5 3 14
10 14 4 0 15 12 8 13 1 3
17 0 15 5 4 6 1 4 17 11
0 3 19 6 17 18 8 5 15 4
0 5 10 16 18 18 1 6 11 2
10 0 15 17 18 11 4 11 15 13
2 15 9 14 13 18 12 14 3 19
18 3 17 0 12 15 18 13 13 1
7 3 2 14 13 0 18 9 3 13
...

  这种写法可以正常工作,但是存在一个问题:每次consumer线程都会循环判断数据是否准备好,这个过程中线程不会让出资源,因此循环判断会带来不必要的开销。正因如此,才需要学习条件变量(condition_variable)。

条件变量

  条件变量(condition_variable)是C++中的一个类,能够阻塞线程直到相关条件满足时被唤醒。条件变量使用wait函数阻塞线程,wait函数一般与unique_lock配合使用。直到有其他线程中的同一个条件变量调用了唤醒函数,当前被wait阻塞的线程才会被唤醒。使用条件变量可以再相关条件不满足时阻塞线程以让出资源,防止因为不断的循环加锁判定带来的开销。
  除了构造函数和析构函数以外,condition_variable类的成员函数有wait函数和notify函数:

wait函数

  wait函数有三个,分别为wait、wait_for、wait_until,这里着重介绍一下wait方法:
  wait函数有以下两个版本:

类型 函数原型
unconditional (1) void wait (unique_lock<mutex>& lck);
predicate (2) template<class Predicate>
void wait (unique_lock<mutex>& lck, Predicate pred);

  (1)参数lck
  一个加了锁的unique_lock对象,所有调用了当前条件变量wait方法的对象都应该在下层使用同一个mutex对象。
  (2)参数pred
  一个可以调用的对象或者函数,函数或者对象没有参数并且需要返回一个bool类型的值,线程将会不停的调用wait函数直到该返回值为true;
  对于以上两个版本的函数,其使用效果如下所示,且以下示例等价:

std::mutex mtx;
std::condition_variable cv;
bool isReady;
//写法一
while(true) {
sleep(1);
std::unique_lock<std::mutext> lk(mtx);
while (!isReady) {
cv.wait(lk);
}
}
//写法二
while (true) {
sleep(1);
std::unique_lock<std::mutext> lk(mtx);
cv.wait(lk, []{return isReady;});
}

  使用lambda函数更显简洁,因此建议第二种写法。

notify函数

  notify函数有两个,分别为notify_one和notify_all。
  (1)notify_one
  唤醒一个满足该条件变量wait函数的线程,如果没有线程处于等待状态,那么此函数不做任何事。如果等待的线程有多个,那么随机选择一个线程唤醒。
  (2)notify_all
  唤醒所有满足该条件变量wait函数的线程,如果没有线程处于等待状态,那么此函数不做任何事。
  notify函数的示例如下所示:

// condition_variable::notify_all
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
void print_id (int id) {
std::unique_lock<std::mutex> lck(mtx);
while (!ready) cv.wait(lck);
// ...
std::cout << "thread " << id << '\n';
}
void go() {
//修改ready标记,并通知打印线程工作
std::unique_lock<std::mutex> lck(mtx);
ready = true;
cv.notify_all();
}
int main (){
std::thread threads[10];
// 创建10个线程,每个线程当ready标记为真时打印自己的id号
for (int i=0; i<10; ++i)
threads[i] = std::thread(print_id,i);
std::cout << "10 threads ready to race...\n";
go(); // go!
for (auto& th : threads) th.join();
return 0;
}

  执行结果如下所示:

10 threads ready to race...
thread 0
thread 1
thread 2
thread 3
thread 4
thread 5
thread 6
thread 7
thread 8
thread 9

  程序在调用了go()函数后,修改了ready标记,然后通知其他线程工作,其他线程便不再阻塞,完成打印工作。

条件变量的使用

  熟悉了条件变量的常用变量及其使用方法,那么我们可以用来实现常见的模型,比如生产者-消费者模型。

生产者-消费者模型

  生产者-消费者模型中生产者负责生产数据,消费者负责消费数据。这里使用5个生产者和5个消费者对一个buff进行操作,这个buffer使用队列存储数据,生产者生产的数据按照顺序存入,消费者在buffer不空时从队列头部取出数据。为了防止生产者产生太多的数据,这里给buffer添加了容量限制。
  生产者-消费者模型的结构如下所示:

  其代码实现如下所示:

#include<bits/stdc++.h>
#include<unistd.h>
#include<mutex>
#include<condition_variable>
using namespace std;
std::mutex mtx; //mutex变量
std::condition_variable cv; //condition_variable变量
std::queue<int> databuffer; //数据buffer
//队列的最大容量
const int MAXSIZE = 100;
void producer(){
while (true) {
sleep(5);
//获取锁
std::unique_lock<std::mutex> lk(mtx);
//等待,直到数据不满则生产数据
cv.wait(lk,[]{return databuffer.size() < MAXSIZE;});
int val = rand();
databuffer.push(val);
cout << "[ Producer Thread ]-Thread_id: " << std::this_thread::get_id() << " Push " << val << endl;
//生产数据通知其他线程
cv.notify_all();
}
}
void consumer(){
while (true) {
sleep(5);
//带参数的构造函数 创建之后立即加锁,销毁对象时会自动解锁,所以定义在局部使用会比较方便
std::unique_lock<std::mutex> lk(mtx);
//等待,直到数据buffer不空时消费数据
cv.wait(lk,[]{return !databuffer.empty();});
int val = databuffer.front();
databuffer.pop();
cout << "[ Consumer Thread ]-Thread_id: " << std::this_thread::get_id() << " Pop " << val << endl;
//消费数据后通知其他线程
cv.notify_all();
}
}
int main(){
srand(time(nullptr));
vector<thread> producers;
vector<thread> consumers;
//创建线程
for (int i =0 ; i < 5; i++) {
producers.push_back(thread(producer));
consumers.push_back(thread(consumer));
}
for (int i =0 ; i < 5; i++) {
producers[i].join();
consumers[i].join();
}
return 0;
}

  部分运行结果如下所示:

[ Producer Thread ]-Thread_id: 140737348204288 Push 1064460442
[ Consumer Thread ]-Thread_id: 140737339811584 Pop 1064460442
[ Producer Thread ]-Thread_id: 140737331418880 Push 1572334329
[ Consumer Thread ]-Thread_id: 140737323026176 Pop 1572334329
[ Producer Thread ]-Thread_id: 140737314633472 Push 53770771
[ Consumer Thread ]-Thread_id: 140737306240768 Pop 53770771
[ Producer Thread ]-Thread_id: 140737297848064 Push 1843725020
[ Consumer Thread ]-Thread_id: 140737289455360 Pop 1843725020
[ Producer Thread ]-Thread_id: 140737281062656 Push 1810443650
[ Consumer Thread ]-Thread_id: 140737272669952 Pop 1810443650
[ Producer Thread ]-Thread_id: 140737348204288 Push 165477773
[ Consumer Thread ]-Thread_id: 140737339811584 Pop 165477773
[ Producer Thread ]-Thread_id: 140737331418880 Push 24357394
[ Producer Thread ]-Thread_id: 140737314633472 Push 2070814110
[ Consumer Thread ]-Thread_id: 140737323026176 Pop 24357394
[ Consumer Thread ]-Thread_id: 140737306240768 Pop 2070814110
[ Producer Thread ]-Thread_id: 140737297848064 Push 1358644763
...

  参考博客C++11条件变量使用详解.

父母子女-水果问题

  问题描述:
  父亲、母亲分别向一个果盘中放置一个水果。父亲放置苹果,母亲放置橘子。儿子专门等待果盘中的苹果。女儿专门等待果盘中的橘子。当果盘中准备好水果以后,儿子和女儿分别根据自己的需要拿走水果。
  这其实也是一个生产者和消费者的模型,使用条件变量实现如下:

#include<bits/stdc++.h>
#include<mutex>
#include<unistd.h>
using namespace std;
mutex plate_mtx; //盘子互斥量
condition_variable plate_cv; //条件通知
//三个条件标记
bool appleIsReady = false; //苹果已准备好
bool orangeIsReady = false; //橘子已准备好
bool plateIsEmpty = true; //盘子已经空了
void father() {
int cnt = 1;
while (true) {
sleep(1);
unique_lock<mutex> lck(plate_mtx);
plate_cv.wait(lck, []{return plateIsEmpty;});
//如果盘子不空,则准备苹果
cout << "[Father] : I prepared my " << cnt;
switch (cnt) {
case 1 : cout << "st ";break;
case 2 : cout << "nd ";break;
case 3 : cout << "rd ";break;
default: cout << "th ";break;
}
cnt++;
cout << "apple." << endl;
//修改盘子标记和苹果标记
plateIsEmpty = false;
appleIsReady = true;
plate_cv.notify_all();
}
}
void mother() {
int cnt = 1;
while (true) {
sleep(1);
unique_lock<mutex> lck(plate_mtx);
plate_cv.wait(lck, []{return plateIsEmpty;});
//如果盘子不空则准备橘子
cout << "\t[Mother] : I prepared my " << cnt;
switch (cnt) {
case 1 : cout << "st ";break;
case 2 : cout << "nd ";break;
case 3 : cout << "rd ";break;
default: cout << "th ";break;
}
cnt++;
cout << "orange." << endl;
//修改盘子标记和橘子标记
plateIsEmpty = false;
orangeIsReady = true;
plate_cv.notify_all();
}
}
void son() {
while (true) {
sleep(1);
unique_lock<mutex> lck(plate_mtx);
plate_cv.wait(lck, []{return !plateIsEmpty && appleIsReady;});
//当盘子不空且苹果已经准备好的情况下拿苹果,并修改标记
cout << "\t\t[Son] : I get an apple! Thank you, dad!" << endl;
plateIsEmpty = true;
appleIsReady = false;
plate_cv.notify_all();
}
}
void daughter() {
while (true) {
sleep(1);
unique_lock<mutex> lck(plate_mtx);
plate_cv.wait(lck, []{return !plateIsEmpty && orangeIsReady;});
//当盘子不空且橘子已经准备好的情况下拿橘子,并修改标记
cout << "\t\t\t[daughter] : I get an orange! Thank you, mom!" << endl;
plateIsEmpty = true;
orangeIsReady = false;
plate_cv.notify_all();
}
}
int main() {
//创建线程
thread father_thread(father);
thread mother_thread(mother);
thread son_thread(son);
thread daughter_thread(daughter);
//join所有线程
father_thread.join();
mother_thread.join();
son_thread.join();
daughter_thread.join();
return 0;
}

  部分执行结果如下所示:

[Father] : I prepared my 1st apple.
[Son] : I get an apple! Thank you, dad!
[Mother] : I prepared my 1st orange.
[daughter] : I get an orange! Thank you, mom!
[Father] : I prepared my 2nd apple.
[Son] : I get an apple! Thank you, dad!
[Mother] : I prepared my 2nd orange.
[daughter] : I get an orange! Thank you, mom!
[Father] : I prepared my 3rd apple.
[Son] : I get an apple! Thank you, dad!
[Mother] : I prepared my 3rd orange.
[daughter] : I get an orange! Thank you, mom!
[Father] : I prepared my 4th apple.
[Son] : I get an apple! Thank you, dad!
[Mother] : I prepared my 4th orange.
[daughter] : I get an orange! Thank you, mom!
[Father] : I prepared my 5th apple.
[Son] : I get an apple! Thank you, dad!
[Mother] : I prepared my 5th orange.
[daughter] : I get an orange! Thank you, mom!
···

当珍惜每一片时光~