【Linux】生产者消费者模型
文章目錄
- 一. 什么是生產者消費者模型
- 1. 基本概念
- 2. 三種關系
- 3. 再次理解生產者消費者模型
- 二. 生產者消費者模型的優點
- 三. 基于BlockingQueue的生產者消費者模型
- 1. 準備工作
- 2. 阻塞隊列實現
- 3. 測試阻塞隊列
- 4. 阻塞隊列完整代碼
- 5. 關于改進阻塞隊列的幾點補充
- 5.1 多生產者多消費者的設計
- 5.2 阻塞隊列所存儲數據可以是更復雜的任務
- 四. 基于環形隊列的生產者消費者模型
- 1. 基本規則
- 2. 環形隊列的實現
- 2.3 單生產者單消費者
- 2.4 多生產者多消費者
一. 什么是生產者消費者模型
1. 基本概念
生產者消費者模型就是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而是通過容器來進行通訊,即生產者生產完數據之后不用等待消費者處理,直接扔給容器;消費者不找生產者要數據,而是直接從容器里取。
2. 三種關系
實際中,生產者可能有多個,消費者也可能有多個,它們彼此之間要應該滿足什么關系呢?
假設下面的情景:
- 每次一個生產者一次只能生產一個數據,
- 每次一個消費者一次只能消費一個數據
- 唯一的容器容器每次只容許一個生產者push數據或一個消費者pop數據。
在滿足上面的情景下,可以推測生產者、消費者彼此之間的關系:
- [生產者和生產者]:互斥與同步關系。互斥體現在所有生產者競爭,只有一個能去容器pop數據。同步的話要保證每一個生產者都有機會到容器中pop數據。
- [消費者和消費者]:互斥與同步關系。互斥體現在所有消費者競爭,只有一個能去容器push數據。同步要求每一個消費者都有機會去容器中push數據。
- [生產者和消費者]:互斥與同步關系。互斥體現在二者只有一個能先訪問容器,這時另外一個只能阻塞等待。同步體現在容器不能永遠只是生產者在push或消費者在pop,生產者生產了一些數據后要告知消費者來消費,反之亦然。
3. 再次理解生產者消費者模型
生產者消費者模型的核心思想在于:眾多的生產者和眾多的消費者通過唯一的容器進行數據交互,在交互的同時必須維護好彼此之間的互斥與同步的關系。
二. 生產者消費者模型的優點
容器就相當于一個緩沖區,平衡了生產者和消費者的數據處理能力。這個容器就是用來給生產者和消費者解耦的。假如只是一對一的生產和消費,快的那方必須等待慢的那方才能完成一次交易,然后繼續下一組;而如果它們之間有一個容器可以存儲數據,其中一個生產者把數據push到容器后不用等消費者,下一個生產者繼續往容器里push數據,也就是說在容器滿之前生產者可以一直連續的生產數據,消費者也是一樣的道理。
即通過容器使生產者和消費者解耦,提高了它們數據交互的效率。
三. 基于BlockingQueue的生產者消費者模型
在多線程編程中阻塞隊列(Blocking Queue)是一種常用于實現生產者和消費者模型的數據結構,它有如下如下幾個特點:
- 眾多生產者中先內部競爭出一個生產者,去阻塞隊列中生產一個數據,完成之后重新內部競爭。
- 眾多消費者中也是內部競爭出一個消費者,去阻塞隊列里拿取一個數據,拿到后重新內部競爭。
- 每次只能有一個線程操作隊列,要么是消費者pop,要么是生產push。
- 當隊列為空時,消費者通知生產者來生產數據,然后自己會被阻塞等待,直到合適的時候生產者把它喚醒過來提醒它消費;當隊列滿時,生產者通知消費者過來拿取數據,然后自己被阻塞等待,直到消費者把它喚醒,叫它繼續生產。
1. 準備工作
從最簡單的開始設計,只有一個生產者和一個消費者,創建兩個線程代表它們,后續它們將在自己的控制流中完成相應的生產和消費任務;至于它們進行數據交互的容器,使用STL的容器適配器queue即可,交互的數據類型為整數。
在主線程中創建好生產者、消費者線程還有阻塞隊列:
int main() { srand((unsigned int)time(nullptr)); // 1、new一個阻塞隊列 BlockQueue<int>* p = new BlockQueue<int>; // 2、創建兩個新線程,分別代表生產者和消費者 pthread_t pro, con; pthread_create(&pro, nullptr, ProducerAction, p); pthread_create(&con, nullptr, ConsumerAction, p); // 3、主線程等待它們完成任務后負責銷毀阻塞隊列 pthread_join(pro, nullptr); pthread_join(pro, nullptr); delete p; return 0; }2. 阻塞隊列實現
基本框架
阻塞隊列中包含4個成員變量:
- _q,一個普通隊列,用來存儲數據。
- _capacity,阻塞隊列的容量,默認可以存5個數據。
- full,一個條件變量。當阻塞隊列滿時生產者在該條件下等待。
- empty,一個條件變量。當阻塞隊列空時消費者在該條件下等待。
- mutex,一把互斥鎖。保證所有時間內只有一個線程能操作隊列。
構造函數負責初始化兩個條件變量和鎖,析構函數負責銷毀它們:
template<class T> class BlockQueue { public:// 構造函數BlockQueue(size_t capcity = 5):_capacity(capcity){pthread_cond_init(&full, nullptr);pthread_cond_init(&empty, nullptr);pthread_mutex_init(&mutex, nullptr);}// 析構函數~BlockQueue(){pthread_cond_destroy(&full);pthread_cond_destroy(&empty);pthread_mutex_destroy(&mutex);}// 生產者插入數據void PushData(T data){};// 消費者刪除數據void PopData(T& data);private: // 判斷阻塞隊列是否為空bool IsFull() { return _q.size() >= _capacity; } // 判斷阻塞隊列是否為滿bool IsEmpty() { return _q.empty(); } queue<T> _q; size_t _capacity; pthread_cond_t full; pthread_cond_t empty; pthread_mutex_t mutex; };生產者生產數據
成員函數void PushData(T data)由生產者調用,功能是插入一個數據到阻塞隊列中,下面是該函數的幾點說明:
- 該函數一進來就要申請鎖,最后插入完成釋放鎖。
- 插入數據之前要檢查阻塞隊列是否滿了,如果滿了就要需要通知消費者來消費,然后自己在full條件下等待。
消費者拿取數據
消費者可以調用阻塞隊列里的成員函數void PopData(T& data)拿走一個阻塞隊列里的數據,下面是該函數的幾點說明:
- 消費者調用時需要傳入一個輸出型參數。阻塞隊列會把隊頭數據內容寫入到輸出型參數的內存空間中。
- 進來的第一步先申請鎖,拿走數據后釋放鎖。
- 拿取數據之前要檢查阻塞隊列是否為空,為空的話要通知生產者進行生產,然后自己在empty條件下等待。
關于阻塞隊列生產、拿取數據操作的幾個問題
問題一:判斷阻塞隊列空滿時為什么要用while循環,而不用if判斷語句?
拿生產者來說,它在插入前隊列已經滿了,如果用if判斷語句的話,在if里面要執行pthread_cond_wait()等待條件full滿足,當這個生產者被喚醒后執行if外面的push插入數據。但是如果pthread_cond_wait()等待出錯了,直接退出if語句會繼續往下執行push操作,導致本來已經滿了的隊列多插入了一個數據;如果我們用while循環的話,即使等待出錯了,這時還會重新回去判斷隊列是否滿了,這樣可以避免隊列數據出錯的問題。
問題二:判空和判滿邏輯中,能不能先等待再喚醒?
答案是不行的,首先對于訪問阻塞隊列的鎖mutex,生產者和消費者是共同競爭的,如果這個線程先等待的話鎖被釋放了,但是它不會繼續往下執行喚醒另一個線程的操作了(因為這個線程自己也在等待被對方喚醒),最后導致鎖沒人申請,線程都等待各自的條件下死等待。
正確的邏輯是先喚醒對方,然后自己在對應的條件變量下等待;后面等到條件成熟時對方把自己喚醒。即我們在設計條件變量時要注意:條件變量在等待被喚醒時需要重新對條件進行判斷,是否條件滿足。
3. 測試阻塞隊列
下面是生產者線程的控制流,由于只有一個生產者所以不用在其控制流中加鎖和引入條件變量來維護生產者和生產者之間的同步與互斥關系。
我們讓生產者每隔一秒生產一個數據:
void* ProducerAction(void* arg) { BlockQueue<int>* p = (BlockQueue<int>*)arg; while(true) { int data = rand()%100+1; p->PushData(data); cout<<"[producer] push data:"<<data<<endl;sleep(1); } }消費者每隔一秒拿取一個數據:
void* ConsumerAction(void* arg) { BlockQueue<int>* p = (BlockQueue<int>*)arg; while(true) { int data = 0; p->PopData(data); cout<<"[consumer] get data:"<<data<<endl; sleep(1); } }下面是main.cpp的全部代碼:
// 包含所有需要的頭文件和阻塞隊列的定義 #include "blockqueue.h" // 生產者線程控制流 void* ProducerAction(void* arg) {BlockQueue<int>* p = (BlockQueue<int>*)arg;while(true){int data = rand()%100+1;p->PushData(data);cout<<"[producer] push data:"<<data<<endl;} }// 消費者線程控制流 void* ConsumerAction(void* arg) {BlockQueue<int>* p = (BlockQueue<int>*)arg;while(true){int data = 0;p->PopData(data);cout<<"[consumer] get data:"<<data<<endl;sleep(2);} }int main() {srand((unsigned int)time(nullptr));// 1、new一個阻塞隊列BlockQueue<int>* p = new BlockQueue<int>;// 2、創建兩個新線程,分別代表生產者和消費者pthread_t pro, con;pthread_create(&pro, nullptr, ProducerAction, p);pthread_create(&con, nullptr, ConsumerAction, p);// 3、主線程等待它們完成任務后負責銷毀阻塞隊列pthread_join(pro, nullptr);pthread_join(pro, nullptr);delete p;return 0; }編譯運行,發現每生產一個數據馬上又被消費者拿走了,這種情況隊列永遠都不會滿:
另外由于我們是先創建生產者線程,再創建消費者線程。所以是生產者先生產,消費者后消費。
如果我們先創建消費者線程的話,消費者線程先拿到隊列鎖,正欲拿取數據時發現隊列為空,然后自己會在條件empty下阻塞掛起并且釋放操作隊列的鎖mutex(注意,如果有多個消費者的話,它們是沒有機會搶這把鎖的,因為它們在搶操作隊列的這個鎖之前必須要獲得內部競爭的鎖);等到生產者線程輪流生產完所有數據之后,最后一個生產者發現隊列已經滿了就會喚醒被一開始被阻塞掛起的消費者來消費;在所有消費者線程拿走完隊列數據之前,這個生產者需要一直阻塞等待:
我們先創建消費者線程,消費者發現隊列為空后輸出“queue is empty”,然后阻塞掛起等待生產者生產完所有數據后喚醒這個消費者線程:
4. 阻塞隊列完整代碼
分兩個文件:
- 頭文件blockqueue.h里包含阻塞隊列的聲明。
- main.cpp:負責創建生產者、消費者線程并聲明它們的執行邏輯。
blockqueue.h
#pragma once #include <queue> #include <unistd.h> #include <stdlib.h> #include <iostream> #include <pthread.h> using namespace std; template<class T> class BlockQueue { public: BlockQueue(size_t capcity = 5) :_capacity(capcity) { pthread_cond_init(&full, nullptr); pthread_cond_init(&empty, nullptr); pthread_mutex_init(&mutex, nullptr); } ~BlockQueue() { pthread_cond_destroy(&full); pthread_cond_destroy(&empty); pthread_mutex_destroy(&mutex);}void PushData(T data) {pthread_mutex_lock(&mutex);while(IsFull()){cout<<"queue is full"<<endl;pthread_cond_signal(&empty);pthread_cond_wait(&full, &mutex);}_q.push(data);pthread_mutex_unlock(&mutex);}void PopData(T& data){pthread_mutex_lock(&mutex);while(IsEmpty()){cout<<"queue is empty"<<endl;pthread_cond_signal(&full);pthread_cond_wait(&empty, &mutex);}data = _q.front();_q.pop();pthread_mutex_unlock(&mutex);}private:bool IsFull(){return _q.size() >= _capacity;}bool IsEmpty(){return _q.empty();}queue<T> _q; size_t _capacity;pthread_cond_t full;pthread_cond_t empty;pthread_mutex_t mutex; };main.cpp
#include "blockqueue.h" void* ProducerAction(void* arg) { BlockQueue<int>* p = (BlockQueue<int>*)arg; while(true) { int data = rand()%100+1; p->PushData(data); cout<<"[producer] push data:"<<data<<endl; sleep(1); } } void* ConsumerAction(void* arg) { BlockQueue<int>* p = (BlockQueue<int>*)arg; while(true) { int data = 0; p->PopData(data); cout<<"[consumer] get data:"<<data<<endl; sleep(1); } } int main() {srand((unsigned int)time(nullptr));// 1、new一個阻塞隊列BlockQueue<int>* p = new BlockQueue<int>;// 2、創建兩個新線程,分別代表生產者和消費者pthread_t pro, con;pthread_create(&pro, nullptr, ProducerAction, p);pthread_create(&con, nullptr, ConsumerAction, p);// 3、主線程等待它們完成任務后負責銷毀阻塞隊列pthread_join(pro, nullptr);pthread_join(pro, nullptr);delete p;return 0; }5. 關于改進阻塞隊列的幾點補充
5.1 多生產者多消費者的設計
只有一個生產者和只有一個消費者的情況,只需在阻塞隊列push和pop時維護生產者和消費者的同步與互斥關系即可。如果有多個生產者和消費者的話需要在它們各自的控制流中加不同鎖和不同的條件變量,確保每次只有一個消費者和一個生產者能去操作隊列。
5.2 阻塞隊列所存儲數據可以是更復雜的任務
阻塞隊列不僅僅可以存簡單的整型數字,還可以是復雜任務的結構體指針,這樣生產者派發任務,消費者拿到后解決里面的任務。比如生產者派發用戶輸入的賬號密碼,消費者拿到后負責把賬號密碼傳輸到數據庫中。
四. 基于環形隊列的生產者消費者模型
1. 基本規則
2. 環形隊列的實現
成員變量說明:
- 這里用一個數組來模擬環形隊列,因為生產者和消費者要并發執行且不能同時操作相同位置的數據,剛好數組可以通過下標隨機訪問數據,所以這里我們選用數組。
- 定義了兩個無符號整型對象_proPos和_cusPos分別指向生產者要生產數據的格子下標和消費者要拿取數據的位置下標。
- 還定義了_proSem和_cusSem兩個信號量對象,分別記錄著環形隊列中格子數量和以生產數據個數。
- 最后還有必要記錄環形隊列的容量大小,可以用它來取模更新_proPos和_cusPos的值。
成員函數說明:
- 這里特意封裝了信號量的PV操作,只需把信號量對象作為參數傳入就能完成信號量的申請、釋放操作。
- 生產者執行Push()操作生產數據時,需要先申請(減一)_proSem信號量,生產完成后釋放(加一)_cusPos信號量,讓消費者來消費。反之亦然
2.3 單生產者單消費者
在主線程中創建兩個新線程分別代表生產者和消費者,消費者每隔一秒地從環形隊列中拿取數據,生產者每隔一秒生產一個數據:
// 基于環形隊列的單生產者單消費者模型 #include "RingQueue.h"// 消費者線程執行的操作 void* Customer(void* arg) {RingQueue<int>* q = (RingQueue<int>*)arg;while(true){sleep(1);int getData;q->Pop(getData);cout<<"[Customer] pop data:"<<getData<<endl;} }// 生產者線程執行的操作 void* Producer(void* arg) {RingQueue<int>* q = (RingQueue<int>*)arg;while(true){sleep(1);int putData = (rand()%100) + 1;q->Push(putData);cout<<"[Producer] push data:"<<putData<<endl;} }int main() { // 1、制造隨機數種子,作為生產者push到環形隊列當中的數據 srand((size_t)time(nullptr)); // 2、new一個環形隊列 RingQueue<int>* q = new RingQueue<int>; // 3、分別創建、等待一個生產者和一個消費者 pthread_t tid1, tid2; pthread_create(&tid1, nullptr, Customer, (void*)q); pthread_create(&tid2, nullptr, Producer, (void*)q); pthread_join(tid1, nullptr); pthread_join(tid2, nullptr); // 4、最后delete環形隊列 delete q; return 0; }編譯運行,由于_proSem初始值為0,一開始沒有數據生產者線程要掛起等待,消費者生產一個數據,生產者就拿取一個數據:
接下來我們讓生產者生產得快,消費者消費的慢:
編譯運行,發現生產者生產的數據瞬間把隊列填滿了,接下來消費者拿走一個數據,生產者再生產一個數據,二者串行執行:
如果消費者消費得快,生產者生產得慢的話,可以推測結果是生產者生產完一個數據,消費者馬上就拿走,然后繼續等待生產者生產數據,這個就不在做演示了。
2.4 多生產者多消費者
這次我們在主線程中分別新建三個生產者線程、三個消費者線程。生產者之間競爭proLock這把鎖,消費者之間競爭cusLock這把鎖,競爭到鎖的線程才能去生產或拿取數據,它們完成一次操作后釋放鎖,然后重新內部競爭:
// 基于環形隊列的多生產者多消費者模型 #include "RingQueue.h"// 構造兩個全局互斥鎖對象,分別用于所有生產者和所有消費者線程 pthread_mutex_t cusLock; pthread_mutex_t proLock;// new一個存儲整數的全局環形隊列 RingQueue<int>* q = new RingQueue<int>;// 消費者線程執行的操作 void* Customer(void* arg) {while(true){size_t id = (size_t)arg;int getData;pthread_mutex_lock(&cusLock);q->Pop(getData); pthread_mutex_unlock(&cusLock); cout<<'['<<"Customer "<<id<<']'<<" Pop data:"<<getData<<endl;sleep(1);} }// 生產者線程執行的操作 void* Producer(void* arg) {size_t id = (size_t)arg;while(true){int putData = (rand()%100) + 1;pthread_mutex_lock(&proLock);q->Push(putData);pthread_mutex_unlock(&proLock);cout<<'['<<"Producer "<<id<<']'<<" push data "<<putData<<endl;sleep(1);} }int main() {// 1、初始化兩把全局互斥鎖pthread_mutex_init(&cusLock, nullptr);pthread_mutex_init(&proLock, nullptr);// 2、創造種子,用于生產隨機數據插入到環形隊列中srand((size_t)time(nullptr));// 3、分別新建三個生產者、消費者線程pthread_t cusTids[3];pthread_t proTids[3];for(size_t i = 0; i < 3; ++i){pthread_create(&cusTids[i], nullptr, Customer, (void*)(i+1));}for(size_t i = 0; i < 3; ++i){pthread_create(&proTids[i], nullptr, Producer, (void*)(i+1)); }// 4、分別等待三個生產者、消費者線程for(size_t i = 0; i < 3; ++i){pthread_join(cusTids[i], nullptr);}for(size_t i = 0; i < 3; ++i){pthread_join(proTids[i], nullptr);}// 5、等待完成后delete環形隊列并銷毀互斥鎖對象delete q;pthread_mutex_destroy(&cusLock);pthread_mutex_destroy(&proLock);return 0; }編譯運行,生產和消費操作并發執行:
總結
以上是生活随笔為你收集整理的【Linux】生产者消费者模型的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【STM32】stm32独立看门狗(IW
- 下一篇: matlab ccd采集,CCD数据采集