admin管理员组

文章数量:1599540

锁机制说明

mutex

互斥锁
用于多线程下保护资源同一时刻只有一个线程读写

std::mutex 是C++标准库中提供的互斥锁(Mutex)类,用于实现多线程之间的互斥访问。
它提供了两个主要的操作:lock() 和 unlock()。
当一个线程调用 lock() 函数时,如果锁没有被其他线程持有,那么该线程将获得锁,
否则,它会被阻塞,直到锁被释放。而当一个线程调用 unlock() 函数时,它释放了之前获得的锁。

condition_variable

条件变量
用于多线程下同步操作:A->B->A->B

std::condition_variable 是C++标准库中提供的条件变量(Condition Variable)类。
条件变量是一种线程间的同步机制,
它允许一个线程在某个特定条件下等待,而其他线程可以在满足这个条件时通知等待的线程继续执行。

消费者生产者模式下加锁代码

std::mutex mutex_; // 定义互斥锁
std::condition_variable cond_; // 定义条件变量

void producer() {
    // 生产数据
    {
        std::unique_lock<std::mutex> lock(mutex_);
        // 生产数据
    } // 离开作用域时自动释放锁

    cond_.notify_one(); // 通知一个等待的消费者线程
}

void consumer() {
    std::unique_lock<std::mutex> lock(mutex_);
    cond_.wait(lock); // 等待条件变量通知,同时释放锁,当被通知后重新获取锁

    // 消费数据
}

为什么需要2个变量

  1. std::mutex(互斥锁):std::mutex 是用来保护共享资源的,确保在任意时刻只有一个线程能够访问共享资源。当一个线程需要修改共享资源时,它必须先锁住互斥锁,以保证没有其他线程同时修改该资源。在上述例子中,std::mutex 被用于保护共享数据的读写,防止多个线程同时修改数据导致的竞态条件。

  2. std::condition_variable(条件变量):std::condition_variable 提供了一种机制,使得一个线程可以在满足特定条件时等待,同时允许其他线程在条件满足时通知等待的线程继续执行。在上述例子中,std::condition_variable 被用于在线程之间建立通信。当一个线程在等待某个条件成立时,它会通过 std::condition_variable 的 wait() 函数主动放弃锁,并且进入等待状态。另外一个线程在某个时刻通过条件变量的 notify_one() 或 notify_all() 函数,通知等待的线程条件已经满足,可以继续执行。


c++11新特性 lock_guardunique_lock

lock_guard

在作用域内自动锁定和解锁
不需要手动管理锁的释放

使用场景:

  • 当需要在某个作用域内对某个资源进行互斥访问时,可以使用 std::lock_guard。因为它的生命周期和作用域相对较小,所以更加轻量

示例代码:

std::mutex mutex_;
{
    std::lock_guard<std::mutex> lock(mutex_); // 在作用域内自动锁定和解锁
    // 执行需要互斥访问的操作
} // 在作用域结束时自动释放锁

unique_lock

在作用域结束时自动释放锁
在作用域内手动控制锁的锁定和解锁

使用场景:

  • 当你需要更灵活地控制锁的生命周期,或者需要在锁定期间执行一些可能引发异常的代码时,可以选择使用 std::unique_lock

示例代码:

std::mutex mutex_;
{
    std::unique_lock<std::mutex> lock(mutex_); // 手动控制锁的锁定和解锁
    // 执行需要互斥访问的操作
    lock.unlock(); // 手动释放锁
    // 执行不需要互斥访问的操作
} // 在作用域结束时自动释放锁

加锁队列示例demo

avpacketqueue.h

#ifndef AVPACKETQUEUE_H
#define AVPACKETQUEUE_H
#include <mutex>
#include <condition_variable>
#include <queue>
#include <list>

#ifdef __cplusplus
extern "C"
{
// 包含ffmpeg头文件
//#include "libavutil/avutil.h"
//#include "libavformat/avformat.h"
#include "libavcodec/avcodec.h"
}
#endif

/*
    包队列
    - 底层结构采用双链表list
*/

class AVPacketQueue
{
public:
    AVPacketQueue();
    ~AVPacketQueue();

    //进队列
    int Push(AVPacket *val);
    //出队列
    AVPacket *Pop(const int timeout);

    //获取队列大小
    int Size();

private:
    //释放资源接口
    void release();

    //控制队列中数据包的总播放时间,单位s
    int duration_cur=0;
    int duration_MAX=300;//最大缓存5分钟,5*60=300s

    //多线程同步设置
    std::mutex mutex_;              //互斥锁,用于防止资源竞争
    std::condition_variable cond_;  //pv条件变量,用于线程同步

    //链表队列,因为是音视频数据,保存包指针即可,不要保存包,避免爆内存
    std::queue<AVPacket *, std::list<AVPacket *>> queue_;
};

#endif // AVPACKETQUEUE_H

avpacketqueue.cpp

#include "avpacketqueue.h"

//构造函数
AVPacketQueue::AVPacketQueue()
{

}

//析构函数
AVPacketQueue::~AVPacketQueue()
{


}

//入队,包进队列
int AVPacketQueue::Push(AVPacket *val,const int timeout)
{
    //加作用域互斥锁:资源同一时间只能有一个线程访问,作用域结束自动释放锁
    std::lock_guard<std::mutex> lock(mutex_);
    //因为是包队列-双链表
    //如果队满了,等待timeout时间
    if(duration_cur>=duration_MAX) {
        // 等待pop或者超时唤醒
        cond_.wait_for(lock, std::chrono::milliseconds(timeout), [this] {
            return duration_cur<duration_MAX;
        });
    }

    //如果依然队满,就直接返回-1
    if(duration_cur>=duration_MAX){
        return -1;
    }

    //入队
    queue_.push(val);
    //增加队列缓存时间
    this->duration_cur+=val->duration;

    cond_.notify_one();//条件变量:通知其他线程可以继续执行
    return 0;
}

//出队,包出队列,获取包
AVPacket *AVPacketQueue::Pop(const int timeout)
{
    std::lock_guard<std::mutex> lock(mutex_); //作用域互斥锁
    //如果空了,等待timeout时间
    if(queue_.empty()) {
        // 等待push或者超时唤醒
        cond_.wait_for(lock, std::chrono::milliseconds(timeout), [this] {
            return !queue_.empty() || abort_;
        });
    }
    if(queue_.empty()){
        return nullptr;
    }

    //出队
    AVPacket * val = queue_.pop();
    //减少队列缓存时间
    this->duration_cur-=val->duration;

    //获取最后一个节点的值
    return val;

}



本文标签: 示例机制代码conditionvariablemutex