个人网站建设流程,汕头品牌建设公司,如何制作网络教程,中信建设目录
一.生产者消费者模型概念
1.为何要使用生产者消费者模型
2.生产者消费者之间的关系
3.生产者消费者模型的优点
二.基于阻塞队列的生产消费模型
1.在阻塞队列中的三种关系
2.BlockingQueue.hpp - 阻塞队列类
3.LockGurad.hpp - RAII互斥锁类
4.Task.hpp - 在阻塞队…目录
一.生产者消费者模型概念
1.为何要使用生产者消费者模型
2.生产者消费者之间的关系
3.生产者消费者模型的优点
二.基于阻塞队列的生产消费模型
1.在阻塞队列中的三种关系
2.BlockingQueue.hpp - 阻塞队列类
3.LockGurad.hpp - RAII互斥锁类
4.Task.hpp - 在阻塞队列中生产/消费的任务类
5.CPTest.cc - 测试
三.基于环形队列的生产消费模型
1.在环形队列中的三种关系
2.环形队列vs阻塞队列
3.CircularQueue.hpp - 环形队列类
4.LockGuard.hpp - RAII互斥锁类
5.Task.cpp - 任务类
6.CPTest.cc - 测试
四. 写在最后
1.关于条件变量的伪唤醒问题
2.关于信号量和锁的先后顺序问题 一.生产者消费者模型概念
1.为何要使用生产者消费者模型 生产者消费者模式就是通过一个容器来解决生产者与消费者之间的强耦合问题, 生产者和消费者彼此之间不直接通讯, 而是通过特定容器来进行通讯, 所以生产者生产完数据之后不需要等待消费者处理, 直接扔给容器, 消费者不直接找生产者索要数据, 而是从容器中拿数据, 容器其实就是由特定的数据结构编写的缓冲区, 平衡了生产者和消费者的处理能力, 通过生产消费之间的容器达到解耦的目的 2.生产者消费者之间的关系 一个场所, 也就是上述容器 二个角色, 生产者与消费者 三种关系 生产者 - 生产者: 互斥关系 消费者 - 消费者: 互斥关系 生产者 - 消费者: 互斥关系, 同步关系 生产者消费者模型分为两种 1.单生产单消费 2.多生产多消费 3.生产者消费者模型的优点 1.将生产者消费者的强耦合关系解耦 2.支持并发 二.基于阻塞队列的生产消费模型
1.在阻塞队列中的三种关系 生产者与生产者之间互斥 消费者与消费者之间互斥 生产者与消费者之间既互斥又同步 同步关系使用条件变量 2.BlockingQueue.hpp - 阻塞队列类
#pragma once#include iostream
#include queue
#include LockGurad.hpptemplate class T
class BlockingQueue
{
public:static BlockingQueueT *GetInst(){return _sInst;}public:void push(const T in){LockGurad lock(_mutex); // RAII锁// 如果阻塞队列满了就阻塞while(isFull()){pthread_cond_wait(_isFullCond, _mutex); // 已满, 阻塞}_bq.push(in);pthread_cond_signal(_isEmptyCond); // 已经添加了一个, 唤醒已空条件阻塞}void pop(T* out){LockGurad lock(_mutex); // RAII锁// 如果阻塞队列空了就阻塞while(isEmpty()){pthread_cond_wait(_isEmptyCond, _mutex); // 已空, 阻塞}*out _bq.front();_bq.pop();pthread_cond_signal(_isFullCond); // 已经取走了一个, 唤醒已满条件阻塞}~BlockingQueue(){pthread_mutex_destroy(_mutex);pthread_cond_destroy(_isFullCond);pthread_cond_destroy(_isEmptyCond);}private:bool isFull() // 判满{return _bq.size() _capacity;}bool isEmpty() // 判空{return _bq.size() 0;}private:std::queueT _bq; // 阻塞队列size_t _capacity; // 阻塞队列最大容量pthread_mutex_t _mutex; // 保护阻塞队列的锁pthread_cond_t _isFullCond; // 判断是否为满pthread_cond_t _isEmptyCond; // 判断是否为空private:// 单例模式BlockingQueue(size_t capacity 5) : _capacity(capacity){pthread_mutex_init(_mutex, nullptr);pthread_cond_init(_isFullCond, nullptr);pthread_cond_init(_isEmptyCond, nullptr);}BlockingQueue(const BlockingQueueT copy) delete;BlockingQueueT operator(const BlockingQueueT copy) delete;static BlockingQueueT _sInst;
};template class T
BlockingQueueT BlockingQueueT::_sInst;
3.LockGurad.hpp - RAII互斥锁类
#pragma once#include pthread.hclass LockGurad
{
public:LockGurad(pthread_mutex_t* mutex):_mutex(mutex){pthread_mutex_lock(_mutex);}~LockGurad(){pthread_mutex_unlock(_mutex);}private:pthread_mutex_t* _mutex;
};4.Task.hpp - 在阻塞队列中生产/消费的任务类
#pragma oncetypedef int(* func_t)(int, int);struct Task
{Task(){};Task(int x, int y, func_t func):_x(x),_y(y),_func(func){}int operator()(){return _func(_x, _y);}int _x;int _y;func_t _func;
};
5.CPTest.cc - 测试
#include BlockingQueue.hpp
#include Task.hpp
#include time.h
#include stdlib.h
#include sys/types.h
#include unistd.hint myAdd(int x, int y)
{return x y;
}void *Consumer(void *args)
{BlockingQueueTask *bqsInst (BlockingQueueTask *)args;while (1){// 消费任务Task t;bqsInst-pop(t);std::cout pthread_self() consumer: t._x t._y t() std::endl;sleep(1);}return nullptr;
}void *Provider(void *args)
{BlockingQueueTask *bqsInst (BlockingQueueTask *)args;while (1){// 制作任务int x rand() % 100 1;int y rand() % 100 1;Task t(x, y, myAdd);// 添加任务bqsInst-push(t);std::cout pthread_self() productor: t._x t._y ? std::endl;sleep(1);}return nullptr;
}int main()
{srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123);// 获取单例指针BlockingQueueTask *bqsInst BlockingQueueTask::GetInst();// 单生产单消费//pthread_t c, p;//pthread_create(c, nullptr, Consumer, bqsInst);//pthread_create(p, nullptr, Provider, bqsInst);// 多生产多消费pthread_t c[2], p[3];pthread_create(c, nullptr, Consumer, bqsInst);pthread_create(c 1, nullptr, Consumer, bqsInst); pthread_create(p, nullptr, Provider, bqsInst);pthread_create(p 1, nullptr, Provider, bqsInst);pthread_create(p 2, nullptr, Provider, bqsInst);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);pthread_join(p[2], nullptr);return 0;
}
三.基于环形队列的生产消费模型
数据结构: 环形队列, 传送入口: http://t.csdn.cn/p1OXv
1.在环形队列中的三种关系 生产者与生产者之间互斥 消费者与消费者之间互斥 生产者与消费者之间同步 同步关系使用信号量 生产消费不需要互斥, 即分别用两把锁 2.环形队列vs阻塞队列
环形队列对比阻塞队列的优势: 生产与消费可以同时进行, 这是因为环形数据结构的特性, 以及信号量sem共同支持的
为什么?
对于阻塞队列而言 当阻塞队列中还存在有数据时, 生产者与消费者必然不会访问到同一个数据, 但是当队列为空, 如果不将生产与消费互斥的话, 就很可能在生产的同时又在消费, 消费的可能是正在生产中的数据, 就可能发生问题 对于环形队列而言 生产者与消费者的下标, 只有在环形队列满或空时指向同一位置, 通过信号量进行同步之后, 不可能会发生同时在同一位置生产消费的情况 生产消费指向同一位置, 只有两种情况 1.环形队列为满, 此时该位置是还存有未消费的数据, 没有多余的生产信号量, 该位置只可能存在消费行为 2.环形队列为空, 此时该位置不存有任何数据, 没有多余的消费信号量, 该位置只可能存在生产行为 3.CircularQueue.hpp - 环形队列类
#include vector
#include semaphore.h
#include LockGurad.hpptemplate class T
class CircularQueue
{
public:CircularQueue(size_t capacity) : _capacity(capacity), _cStep(0), _pStep(0){_v.resize(capacity);sem_init(_spaceSem, 0, capacity);sem_init(_dataSem, 0, 0);pthread_mutex_init(_cMtx, nullptr);pthread_mutex_init(_pMtx, nullptr);}void push(const T in){// 这里把信号量加在锁前面, 进一步提高效率, 原理类似于买票看电影, 提前买票在排队检票效率更高sem_wait(_spaceSem);{LockGurad lock(_pMtx); // RAII加锁风格_v[_pStep] in;_pStep % _capacity;}sem_post(_dataSem);}void pop(T *out){sem_wait(_dataSem);{LockGurad lock(_cMtx);*out _v[_cStep];_cStep % _capacity;}sem_post(_spaceSem);}~CircularQueue(){sem_destroy(_spaceSem);sem_destroy(_dataSem);pthread_mutex_destroy(_cMtx);pthread_mutex_destroy(_pMtx);}private:std::vectorT _v; // 环形队列size_t _capacity; // 最大容量int _cStep; // 消费下标int _pStep; // 生产下标sem_t _spaceSem; // 空间资源信号量sem_t _dataSem; // 数据资源信号量pthread_mutex_t _cMtx; // 生产锁pthread_mutex_t _pMtx; // 消费锁
};
4.LockGuard.hpp - RAII互斥锁类
#pragma once#include pthread.hclass LockGurad
{
public:LockGurad(pthread_mutex_t* mutex):_mutex(mutex){pthread_mutex_lock(_mutex);}~LockGurad(){pthread_mutex_unlock(_mutex);}private:pthread_mutex_t* _mutex;
};5.Task.cpp - 任务类
#pragma oncetypedef int(* func_t)(int, int);struct Task
{Task(){};Task(int x, int y, func_t func):_x(x),_y(y),_func(func){}int operator()(){return _func(_x, _y);}int _x;int _y;func_t _func;
};
6.CPTest.cc - 测试
#include iostream
#include CircularQueue.hpp
#include Task.hpp
#include time.h
#include unistd.h
#include stdlib.hint myAdd(int x, int y)
{return x y;
}void* Consumer(void* args)
{CircularQueueTask* cq (CircularQueueTask*)args;while(1){// 消费任务Task t;cq-pop(t);std::cout 消费: t._x t._y t() std::endl;}
}void* Provider(void* args)
{CircularQueueTask* cq (CircularQueueTask*)args;while(1){// 制作任务int x rand() % 50 1;int y rand() % 50 1;Task t(x, y, myAdd);// 添加任务cq-push(t);std::cout 生产: t._x t._y ? std::endl;sleep(1);}
}int main()
{srand((unsigned int)time(nullptr) ^ 0x123);CircularQueueTask* cq new CircularQueueTask(5);pthread_t c[2], p[3];pthread_create(c, nullptr, Consumer, cq);pthread_create(c1, nullptr, Consumer, cq);pthread_create(p, nullptr, Provider, cq);pthread_create(p1, nullptr, Provider, cq);pthread_create(p2, nullptr, Provider, cq);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);pthread_join(p[2], nullptr);return 0;
}
四. 写在最后
1.关于条件变量的伪唤醒问题 为什么不用if, 而使用while, 原因就是如果此时只有1个资源, 而刚刚有很多个线程在挂起等待, 为了这1个资源全部被唤醒了, 那么就一定只有一个线程真正的拿到资源, 其他线程都是伪唤醒, 避免这种问题的方法就是用while判断, 当线程被唤醒时再循环判断一次, 如果为假就会跳出循环, 如果为真说明是伪唤醒, 则继续挂起等待
2.关于信号量和锁的先后顺序问题
值得一提的是, 环形队列中生产者消费者的同步关系由信号量来实现, 并且生产者与消费者之间没有互斥关系, 那么生产者与消费者分别使用两把锁维护其各自的互斥关系, 那么有一个值得思考的问题就是信号量和锁谁在前谁在后问题 答案是: 信号量先申请比较好. 信号量确实访问了临界资源, 但其内部实现也是原子性的, 而且信号量类似于预定机制, 只是提前把资源预定下来, 想要访问资源, 只需要等待取走锁资源即可