condition_variable是C++并发编程中常用的一个类, 通过对它的使用, 可以解决并发程序设计中的一些问题.

先决条件

要使用condition_variable, 请确保

  • C++11或更新的标准
  • #include <condition_variable>

常用方法

方法说明
notify_all()恢复(唤醒)所有等待中的线程
notify_one()恢复(唤醒)某个等待中的线程
wait()使线程进入等待状态
wait_for()使线程进入等待状态, 并设置线程等待持续的时间
wait_until()使线程进入等待状态, 并设置线程等待结束的最晚时间点
表 condition_variable的常用方法

解释

显而易见, condition_variable为我们提供了一种控制线程 暂停/进行 的工具, 它帮助我们使有关的线程在特定条件下暂停运行或继续运行.

当某个 condition_variable 对象的 wait() 或 wait_for()/wait_until() 被调用时, 调用它的线程会进入阻塞状态, 也就是暂停运行, 直到其他线程调用了同一个 condition_variable 对象的 notify_all() 或 notify_one() 方法 (调用 notify_one() 时有可能但不一定被唤醒) . 这些提到的类似的方法的区别将会在下面解释.

wait() 方法有两种重载. 第一种是 wait(unique_lock<mutex>&), 它使线程无条件地进入等待状态. 第二种是wait(unique<mutex>&, _Predicate), 它使线程在条件 _Predicate 为 false 的情况下保持等待状态. 需要注意, _Predicate 参数必须是一个可调用的对象 (也就是可以像调用一个不需要参数, 且返回 bool 值的函数一样使用 () 来调用它. 可以是一个lambda表达式, 可以是一个函数, 也可以是一个重载了()操作符的类的对象).

上面提到的两种 wait() 方法都需要一个 unique_lock<mutex> 型的参数, 这个参数一般是由一个全局的 mutex 互斥锁构造的 unique_lock 对象. 在这个对象被作为参数传入的时候, 它必须处于已经被 lock 的状态. 当我们使用 mutex 构造 unique_lock 的时候, 使用的 mutex 会被自动 lock, 因此只要在调用 wait 之前没有对它解锁就没有问题.

当上述 unique_lock 被传入后, 被调用 wait() 的 condition_variable 对象会将其解锁, 同时阻塞调用 wait() 的线程, 直到其他线程调用 notify_all() 或 notify_one() 时, 会重新锁上传入的 unique_lock, 然后恢复被阻塞的线程.

注意, 调用 wait() 时互斥锁会被暂时解锁, 也就意味着此时这个线程失去了对这个互斥锁的所有权, 而其他线程可以抢占这个锁. 当其他线程占有了这个锁, 又调用了 wait() 后, 将会继续发生同样的上述的过程……

wait_for() 和 wait_until() 的作用和 wait() 类似, 通过它的名字就能猜出它的作用, 本文不做赘述.

紧接着, 我们将目光转向另一边的 notify(). 调用某个 condition_variable 对象的 notify_all() 方法时, 将会使所有因为调用了这个对象的 wait() 方法的线程恢复运行. 但是, 当这些线程在调用 wait() 时使用了同一个互斥锁时, 它们将不会同时开始运行, 因为它们会抢占这一个锁, 抢到者继续运行, 否则就继续等待, 直到抢到这个锁. 相反, 如果这些线程没有使用同一个锁, 那么它们将各自运行.

与此同时, 我们关注 notify_one() 方法. notify_one() 的作用和上面讲过的 notify_all() 类似, 只不过它每次被调用后只唤醒一个线程而不是所有线程.

示例

现在来看看 condition_variable 到底怎么用, 以及有什么效果.

样例代码 (源文件见附件)

Symbol.cpp :

#include <condition_variable>
#include <iostream>
#include <memory>
#include <mutex>
#include <thread>

using thread             = std::thread;
using condition_variable = std::condition_variable;
using mutex              = std::mutex;
using unique_lock        = std::unique_lock<std::mutex>;
using unique_ptr         = std::unique_ptr<thread>;
using std::cout;

condition_variable cv; // 全局的条件变量
mutex mtx; // 全局的互斥锁

void ThreadFunc(int val){
    unique_lock lock(mtx);
    cout << "Thread " << val << " started!!\n";
    cv.wait(lock);

    cout << "Thread " << val << " is not waiting anymore!!\n";
}

int main(void){
    const int NUM_THREAD = 10;
    unique_ptr aThread[NUM_THREAD]; // 创建10个指向线程对象的智能指针
    for(int i = 0; i < NUM_THREAD; ++i)
        aThread[i].reset(new thread(ThreadFunc, i)); // 实例化线程对象
    // 等待 100 毫秒, 确保所有线程都进入了阻塞状态
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    
    cout << "---------------------------\n";
    cout << "Notify all the threads now.\n";
    cout << "---------------------------\n";

    cv.notify_all();

    for(int i = 0; i < NUM_THREAD; ++i)
        if(aThread[i]->joinable())
            aThread[i]->join(); // 确保所有线程正常结束
    
    cout << "---------------------------\n";
    cout << "Finished.\n";
    return 0;
}

运行上面的代码将会发生什么事情? 来看一看 (测试环境: Microsoft Windows 11, PowerShell. 一般情况其他环境也能得到类似的结果. )

首先生成可执行文件.

g++ -g Symbol.cpp -o Symbol.exe

然后运行得到的程序.

./Symbol.exe

最后看看结果如何.

Thread 0 started!!
Thread 1 started!!
Thread 2 started!!
Thread 5 started!!
Thread 4 started!!
Thread 3 started!!
Thread 6 started!!
Thread 7 started!!
Thread 8 started!!
Thread 9 started!!
---------------------------
Notify all the threads now.
---------------------------
Thread 7 is not waiting anymore!!
Thread 8 is not waiting anymore!!
Thread 9 is not waiting anymore!!
Thread 2 is not waiting anymore!!
Thread 1 is not waiting anymore!!
Thread 6 is not waiting anymore!!
Thread 3 is not waiting anymore!!
Thread 4 is not waiting anymore!!
Thread 5 is not waiting anymore!!
Thread 0 is not waiting anymore!!
---------------------------
Finished.

可以看到, 运行程序后10个线程依次被创建, 然后开始运行, 按数字 0~9 输出了提示信息. 在输出了提示信息后, 线程各自进入等待状态, 暂停运行.

创建完线程后, 我们的主线程等待 100ms, 确保所有线程都有充足时间来进入等待状态. 紧接着, 主线程输出提示信息, 然后调用全局的 condition_variable 的 notify_all().

notify_all() 被调用后, 所有线程便开始抢占全局有且仅有一个的互斥锁 mutex, 抢到的就输出提示信息, 没抢到就继续等. 因为这些线程不是依次进入等待状态, 也不是依次被唤醒的, 所以我们看到的, 就是这10个线程以被打乱的顺序分别输出提示信息.

所有的线程完成输出提示信息的任务后, 该结束的都结束了, 还没完事的, 我们就调用 join() 等它完成. 这些线程退出舞台后, 我们的主线程输出最后一条提示信息, 示意所有事情都完成了, 然后程序结束.

写在最后

如果您觉得这篇文章写得不错, 或者解决了您不懂的问题, 烦请您把它分享给其他朋友, 谢谢!

如果上面有 不清晰的/不准确的/错误的 内容, 请您及时联系.

附件 (源码及程序):

 Save as PDF
最后修改日期: 2022-06-06

作者

留言

撰写回覆或留言