網(wǎng)站首頁 編程語言 正文
文章目錄
- 信號量
- 信號量的接口
- 初始化
- 銷毀
- 等待信號量
- 發(fā)布信號量
- 環(huán)形隊(duì)列
- 結(jié)合信號量設(shè)計(jì)模型
- 實(shí)現(xiàn)基于環(huán)形隊(duì)列的生產(chǎn)者消費(fèi)者模型
- Task.hpp
- RingQueue.hpp
- main.cc
- 效果
- 對于多生產(chǎn)多消費(fèi)的情況
信號量
信號量的本質(zhì)是一個(gè)計(jì)數(shù)器
首先一份公共資源在實(shí)際情況中可能存在不同的線程可以并發(fā)訪問不同的區(qū)域。因此當(dāng)一個(gè)線程想要訪問臨界資源時(shí),可以先申請信號量,只要信號量申請成功就代表著這個(gè)線程在未來一定能訪問臨界資源的一部分。而這個(gè)申請信號量的本質(zhì)就是對臨界資源中特定的小塊資源進(jìn)行預(yù)定機(jī)制
所有的線程都能看到信號量,也就代表著信號量的本身就是公共資源。如果信號量申請失敗,說明該線程并不滿足條件變量則線程進(jìn)入阻塞等待。
信號量的操作可以稱為 PV操作,申請信號量成功則信號量的值減1(P),歸還資源后信號量的值加1(V)。在這過程中一定要保證原子性
信號量的接口
信號量的類型為:sem_t
初始化
定義一個(gè)信號量并初始化:
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsing int value);
參數(shù)一為信號量的地址
參數(shù)二如果設(shè)為0則表示線程間共享,非0則表示進(jìn)程間共享
參數(shù)三為信號量的初始值,也就是這個(gè)計(jì)數(shù)器的最大值
初始化成功返回0
銷毀
int sem_destroy(sem_t *sem);
參數(shù)為信號量的地址
等待信號量
也就是申請信號量,等待成功則信號量的值減1
int sem_wait(sem_t *sem);
參數(shù)為信號量的地址
發(fā)布信號量
也就是歸還信號量,發(fā)布成功則信號量的值加1
int sem_post(sem_t *sem);
參數(shù)為信號量的地址
環(huán)形隊(duì)列
將生產(chǎn)者消費(fèi)者模型放到環(huán)形隊(duì)列里該如何理解呢?
首先環(huán)形隊(duì)列可以理解為就是指一個(gè)循環(huán)的數(shù)組。而對于生產(chǎn)者和消費(fèi)者而言,生產(chǎn)者負(fù)責(zé)往這個(gè)數(shù)組中放入數(shù)據(jù),消費(fèi)者負(fù)責(zé)從這個(gè)數(shù)組中拿取數(shù)據(jù)。
對于生產(chǎn)者而言:只有環(huán)形隊(duì)列中有空的空間時(shí)才可以放數(shù)據(jù)
對于消費(fèi)者而言:環(huán)形隊(duì)列中必須要有不為空的空間時(shí)才能拿數(shù)據(jù)
那么肯定是生產(chǎn)者放入數(shù)據(jù)后消費(fèi)者才能拿數(shù)據(jù),因此在喚醒隊(duì)列中消費(fèi)者永遠(yuǎn)都不能超過生產(chǎn)者
假如生產(chǎn)者和消費(fèi)者都指向了同一塊空間時(shí),要么隊(duì)列滿了,要么隊(duì)列為空
如果隊(duì)列滿了,那么消費(fèi)者就必須要先運(yùn)行拿走數(shù)據(jù)后生產(chǎn)者才可以運(yùn)行
如果隊(duì)列為空,那么生產(chǎn)者必須放入數(shù)據(jù)后消費(fèi)者才能運(yùn)行拿數(shù)據(jù)
如果為其他情況,說明生產(chǎn)者和消費(fèi)者所指向的空間是不一樣的
結(jié)合信號量設(shè)計(jì)模型
那么根據(jù)上述的消費(fèi)者和生產(chǎn)者不同的特性,結(jié)合信號量為兩者設(shè)計(jì)條件
對于生產(chǎn)者而言看中的是隊(duì)列中剩余的空間,那么可以給生產(chǎn)者設(shè)置信號量為隊(duì)列的總?cè)萘?/p>
對于消費(fèi)者而言看中的是隊(duì)列中不為空的空間,則可以給消費(fèi)者初始的信號量值設(shè)為0
那么對于生產(chǎn)過程而言:
- 首先生產(chǎn)者去申請信號量也就是 P 操作
- 申請成功則往下繼續(xù)生產(chǎn)操作的執(zhí)行,申請失敗則阻塞等待
- 執(zhí)行完生產(chǎn)操作后,V操作,注意此時(shí)的V操作并不是對生產(chǎn)者的信號量操作,而是對消費(fèi)者的信號量操作。因?yàn)榇藭r(shí)隊(duì)列中已經(jīng)有了不為空的空間,所以消費(fèi)者的信號量就可以進(jìn)行加1操作,這樣消費(fèi)者才能申請成功信號量
對于消費(fèi)過程而言:
- 消費(fèi)者申請信號量
- 申請成功則往下繼續(xù)消費(fèi)操作的執(zhí)行,失敗則阻塞等待
- 執(zhí)行完消費(fèi)操作后,V操作,注意此時(shí)V操作是對生產(chǎn)者的信號量操作,因?yàn)橄M(fèi)完成后,隊(duì)列里就多了一個(gè)空的空間,生產(chǎn)者的信號量就可以進(jìn)行加1操作
那么對于生產(chǎn)者和消費(fèi)者的位置其實(shí)就是隊(duì)列中的下標(biāo),并且一定是有兩個(gè)下標(biāo),如果隊(duì)列為空為滿,那么兩者的位置相同
實(shí)現(xiàn)基于環(huán)形隊(duì)列的生產(chǎn)者消費(fèi)者模型
Task.hpp
#include <iostream>
#include <string>
#include <functional>
#include <cstdio>
// 負(fù)責(zé)計(jì)算的任務(wù)類
class CPTask
{
// 調(diào)用的計(jì)算方法,根據(jù)傳入的字符參數(shù)決定
typedef std::function<int(int, int, char)> func_t;
public:
CPTask()
{
}
CPTask(int x, int y, char op, func_t func)
: _x(x), _y(y), _op(op), _func(func)
{
}
// 實(shí)現(xiàn)傳入的函數(shù)調(diào)用
std::string operator()()
{
int count = _func(_x, _y, _op);
// 將結(jié)果以自定義的字符串形式返回
char res[2048];
snprintf(res, sizeof res, "%d %c %d = %d", _x, _op, _y, count);
return res;
}
// 顯示出當(dāng)前傳入的參數(shù)
std::string tostring()
{
char res[1024];
snprintf(res, sizeof res, "%d %c %d = ", _x, _op, _y);
return res;
}
private:
int _x;
int _y;
char _op;
func_t _func;
};
// 負(fù)責(zé)計(jì)算的任務(wù)函數(shù)
int Math(int x, int y, char c)
{
int count;
switch (c)
{
case '+':
count = x + y;
break;
case '-':
count = x - y;
break;
case '*':
count = x * y;
break;
case '/':
{
if (y == 0)
{
std::cout << "div zero" << std::endl;
count = -1;
}
else
count = x / y;
break;
}
default:
break;
}
return count;
}
class SCTask
{
// 獲取保存數(shù)據(jù)的方法
typedef std::function<void(std::string)> func_t;
public:
SCTask()
{
}
SCTask(const std::string &str, func_t func)
: _str(str), _func(func)
{
}
void operator()()
{
_func(_str);
}
private:
std::string _str;
func_t _func;
};
RingQueue.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <vector>
#include <semaphore.h>
#include <string>
#include <ctime>
#include <unistd.h>
#include <cassert>
using namespace std;
template <class T>
class RingQueue
{
public:
RingQueue(const int size = 10)
: _size(size)
{
// 初始化信號量以及數(shù)組大小
// 生產(chǎn)者的信號量初始為數(shù)組大小
// 消費(fèi)者的信號量初始為0
_Rqueue.resize(_size);
assert(sem_init(&_ps, 0, _size) == 0);
assert(sem_init(&_cs, 0, 0) == 0);
}
void push(const T &in)
{
// 生產(chǎn)者申請信號量
assert(sem_wait(&_ps) == 0);
// 為了模擬環(huán)形,下標(biāo)需要模等于數(shù)組的大小
// 保證下標(biāo)不越界
_Rqueue[_Pindex++] = in;
_Pindex %= _size;
// 消費(fèi)者發(fā)布信號量
assert(sem_post(&_cs) == 0);
}
void pop(T *out)
{
// 消費(fèi)者申請信號量
assert(sem_wait(&_cs) == 0);
// 為了模擬環(huán)形,下標(biāo)需要模等于數(shù)組的大小
// 保證下標(biāo)不越界
*out = _Rqueue[_Cindex++];
_Cindex %= _size;
// 生產(chǎn)者發(fā)布信號量
assert(sem_post(&_ps) == 0);
}
~RingQueue()
{
sem_destroy(&_ps);
sem_destroy(&_cs);
}
private:
vector<T> _Rqueue;
int _size; // 數(shù)組的容量大小
sem_t _ps; // 生產(chǎn)者的信號量
sem_t _cs; // 消費(fèi)者的信號量
int _Pindex = 0; // 生產(chǎn)者的下標(biāo)
int _Cindex = 0; // 消費(fèi)者的下標(biāo)
};
main.cc
#include "RingQueue.hpp"
#include "Task.hpp"
void *Pplay(void *rp)
{
RingQueue<CPTask> *rq = (RingQueue<CPTask> *)rp;
while (1)
{
string s("+-*/");
// 隨機(jī)產(chǎn)生數(shù)據(jù)插入
int x = rand() % 100 + 1;
int y = rand() % 100 + 1;
// 隨機(jī)提取+-*/
int i = rand() % s.size();
// 定義好實(shí)現(xiàn)類的對象
CPTask c(x, y, s[i], Math);
// 將整個(gè)對象插入到計(jì)算隊(duì)列中
rq->push(c);
cout << "生產(chǎn)數(shù)據(jù)完成: " << c.tostring() << endl;
sleep(1);
}
}
void *Cplay(void *cp)
{
RingQueue<CPTask> *rq = (RingQueue<CPTask> *)cp;
while (1)
{
// 隊(duì)列拿出數(shù)據(jù)
CPTask c;
rq->pop(&c);
// 調(diào)用拿出的對象獲取最終的結(jié)果
string s = c();
cout << "消費(fèi)完成,取出的數(shù)據(jù)為: " << s << endl;
sleep(2);
}
}
int main()
{
srand(time(nullptr));
RingQueue<CPTask> *rq = new RingQueue<CPTask>();
pthread_t p, c;
pthread_create(&p, nullptr, Pplay, (void *)rq);
pthread_create(&c, nullptr, Cplay, (void *)rq);
pthread_join(p, nullptr);
pthread_join(c, nullptr);
delete rq;
return 0;
}
效果
對于多生產(chǎn)多消費(fèi)的情況
如果現(xiàn)在有多個(gè)生產(chǎn)和多個(gè)消費(fèi),那么就要保證臨界資源的安全,這時(shí)候就得加鎖。
加鎖可以實(shí)現(xiàn)在申請信號量之后,因?yàn)樯暾埿盘柫繉?shí)際上就是一個(gè)原子性的操作,并不會(huì)因?yàn)槎嗑€程而導(dǎo)致沖突。當(dāng)線程申請好了各自的信號量之后再往下運(yùn)行加鎖,就不用讓所有線程都等待在加鎖前。而解鎖同樣可以在發(fā)布信號量之后。
原文鏈接:https://blog.csdn.net/CHJBL/article/details/134411783
- 上一篇:沒有了
- 下一篇:沒有了
相關(guān)推薦
- 2022-09-16 詳解C++?中?shared_ptr?weak_ptr_C 語言
- 2022-07-12 Linux命令之美|linux使用tar誤解壓之后,如何刪除解壓后的文件
- 2022-04-18 騰訊im中調(diào)用 setMessageRead 會(huì)話列表中的未讀消息還在存在
- 2022-11-23 C語言學(xué)習(xí)之關(guān)鍵字的示例詳解_C 語言
- 2022-05-16 C#實(shí)現(xiàn)的4種常用數(shù)據(jù)校驗(yàn)方法小結(jié)(CRC校驗(yàn),LRC校驗(yàn),BCC校驗(yàn),累加和校驗(yàn))_C#教程
- 2022-12-11 Redhat持久化日志實(shí)戰(zhàn)示例詳解_相關(guān)技巧
- 2022-11-06 SQL?Server?Reporting?Services?匿名登錄的問題及解決方案_MsSql
- 2022-03-15 feign.RetryableException: Read timed out executing
- 欄目分類
-
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支