日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無先后,達(dá)者為師

網(wǎng)站首頁 編程語言 正文

Linux信號量以及基于環(huán)形隊(duì)列的生產(chǎn)者消費(fèi)者模型

作者:CHJBL 更新時(shí)間: 2024-01-29 編程語言

文章目錄

  • 信號量
    • 信號量的接口
      • 初始化
      • 銷毀
      • 等待信號量
      • 發(fā)布信號量
  • 環(huán)形隊(duì)列
    • 結(jié)合信號量設(shè)計(jì)模型
  • 實(shí)現(xiàn)基于環(huán)形隊(duì)列的生產(chǎn)者消費(fèi)者模型
    • Task.hpp
    • RingQueue.hpp
    • main.cc
    • 效果
    • 對于多生產(chǎn)多消費(fèi)的情況

信號量

信號量的本質(zhì)是一個(gè)計(jì)數(shù)器

首先一份公共資源在實(shí)際情況中可能存在不同的線程可以并發(fā)訪問不同的區(qū)域。因此當(dāng)一個(gè)線程想要訪問臨界資源時(shí),可以先申請信號量,只要信號量申請成功就代表著這個(gè)線程在未來一定能訪問臨界資源的一部分。而這個(gè)申請信號量的本質(zhì)就是對臨界資源中特定的小塊資源進(jìn)行預(yù)定機(jī)制

所有的線程都能看到信號量,也就代表著信號量的本身就是公共資源如果信號量申請失敗,說明該線程并不滿足條件變量則線程進(jìn)入阻塞等待

信號量的操作可以稱為 PV操作,申請信號量成功則信號量的值減1(P),歸還資源后信號量的值加1(V)。在這過程中一定要保證原子性

信號量的接口

信號量的類型為:sem_t

初始化

定義一個(gè)信號量并初始化:

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsing int value);

參數(shù)一為信號量的地址

參數(shù)二如果設(shè)為0則表示線程間共享,非0則表示進(jìn)程間共享

參數(shù)三為信號量的初始值,也就是這個(gè)計(jì)數(shù)器的最大值

初始化成功返回0

銷毀

int sem_destroy(sem_t *sem);

參數(shù)為信號量的地址

等待信號量

也就是申請信號量,等待成功則信號量的值減1

int sem_wait(sem_t *sem);

參數(shù)為信號量的地址

發(fā)布信號量

也就是歸還信號量,發(fā)布成功則信號量的值加1

int sem_post(sem_t *sem);

參數(shù)為信號量的地址

環(huán)形隊(duì)列

將生產(chǎn)者消費(fèi)者模型放到環(huán)形隊(duì)列里該如何理解呢?

首先環(huán)形隊(duì)列可以理解為就是指一個(gè)循環(huán)的數(shù)組。而對于生產(chǎn)者和消費(fèi)者而言,生產(chǎn)者負(fù)責(zé)往這個(gè)數(shù)組中放入數(shù)據(jù),消費(fèi)者負(fù)責(zé)從這個(gè)數(shù)組中拿取數(shù)據(jù)

對于生產(chǎn)者而言:只有環(huán)形隊(duì)列中有空的空間時(shí)才可以放數(shù)據(jù)

對于消費(fèi)者而言:環(huán)形隊(duì)列中必須要有不為空的空間時(shí)才能拿數(shù)據(jù)

那么肯定是生產(chǎn)者放入數(shù)據(jù)后消費(fèi)者才能拿數(shù)據(jù),因此在喚醒隊(duì)列中消費(fèi)者永遠(yuǎn)都不能超過生產(chǎn)者

假如生產(chǎn)者和消費(fèi)者都指向了同一塊空間時(shí),要么隊(duì)列滿了,要么隊(duì)列為空

如果隊(duì)列滿了,那么消費(fèi)者就必須要先運(yùn)行拿走數(shù)據(jù)后生產(chǎn)者才可以運(yùn)行

如果隊(duì)列為空,那么生產(chǎn)者必須放入數(shù)據(jù)后消費(fèi)者才能運(yùn)行拿數(shù)據(jù)

如果為其他情況,說明生產(chǎn)者和消費(fèi)者所指向的空間是不一樣的

image-20230717154905990

結(jié)合信號量設(shè)計(jì)模型

那么根據(jù)上述的消費(fèi)者和生產(chǎn)者不同的特性,結(jié)合信號量為兩者設(shè)計(jì)條件

對于生產(chǎn)者而言看中的是隊(duì)列中剩余的空間,那么可以給生產(chǎn)者設(shè)置信號量為隊(duì)列的總?cè)萘?/p>

對于消費(fèi)者而言看中的是隊(duì)列中不為空的空間,則可以給消費(fèi)者初始的信號量值設(shè)為0

那么對于生產(chǎn)過程而言:

  1. 首先生產(chǎn)者去申請信號量也就是 P 操作
  2. 申請成功則往下繼續(xù)生產(chǎn)操作的執(zhí)行,申請失敗則阻塞等待
  3. 執(zhí)行完生產(chǎn)操作后,V操作,注意此時(shí)的V操作并不是對生產(chǎn)者的信號量操作,而是對消費(fèi)者的信號量操作。因?yàn)榇藭r(shí)隊(duì)列中已經(jīng)有了不為空的空間,所以消費(fèi)者的信號量就可以進(jìn)行加1操作,這樣消費(fèi)者才能申請成功信號量

對于消費(fèi)過程而言:

  1. 消費(fèi)者申請信號量
  2. 申請成功則往下繼續(xù)消費(fèi)操作的執(zhí)行,失敗則阻塞等待
  3. 執(zhí)行完消費(fèi)操作后,V操作,注意此時(shí)V操作是對生產(chǎn)者的信號量操作,因?yàn)橄M(fèi)完成后,隊(duì)列里就多了一個(gè)空的空間,生產(chǎn)者的信號量就可以進(jìn)行加1操作

那么對于生產(chǎn)者和消費(fèi)者的位置其實(shí)就是隊(duì)列中的下標(biāo),并且一定是有兩個(gè)下標(biāo),如果隊(duì)列為空為滿,那么兩者的位置相同

實(shí)現(xiàn)基于環(huán)形隊(duì)列的生產(chǎn)者消費(fèi)者模型

Task.hpp

#include <iostream>
#include <string>
#include <functional>
#include <cstdio>

// 負(fù)責(zé)計(jì)算的任務(wù)類
class CPTask
{
    // 調(diào)用的計(jì)算方法,根據(jù)傳入的字符參數(shù)決定
    typedef std::function<int(int, int, char)> func_t;

public:
    CPTask()
    {
    }

    CPTask(int x, int y, char op, func_t func)
        : _x(x), _y(y), _op(op), _func(func)
    {
    }

    // 實(shí)現(xiàn)傳入的函數(shù)調(diào)用
    std::string operator()()
    {
        int count = _func(_x, _y, _op);

        // 將結(jié)果以自定義的字符串形式返回
        char res[2048];
        snprintf(res, sizeof res, "%d %c %d = %d", _x, _op, _y, count);
        return res;
    }

    // 顯示出當(dāng)前傳入的參數(shù)
    std::string tostring()
    {
        char res[1024];
        snprintf(res, sizeof res, "%d %c %d = ", _x, _op, _y);
        return res;
    }

private:
    int _x;
    int _y;
    char _op;
    func_t _func;
};

// 負(fù)責(zé)計(jì)算的任務(wù)函數(shù)
int Math(int x, int y, char c)
{
    int count;
    switch (c)
    {
    case '+':
        count = x + y;
        break;
    case '-':
        count = x - y;
        break;
    case '*':
        count = x * y;
        break;
    case '/':
    {
        if (y == 0)
        {
            std::cout << "div zero" << std::endl;
            count = -1;
        }
        else
            count = x / y;
        break;
    }
    default:
        break;
    }

    return count;
}

class SCTask
{
    // 獲取保存數(shù)據(jù)的方法
    typedef std::function<void(std::string)> func_t;

public:
    SCTask()
    {
    }

    SCTask(const std::string &str, func_t func)
        : _str(str), _func(func)
    {
    }

    void operator()()
    {
        _func(_str);
    }

private:
    std::string _str;
    func_t _func;
};

RingQueue.hpp

#pragma once

#include <iostream>
#include <pthread.h>
#include <vector>
#include <semaphore.h>
#include <string>
#include <ctime>
#include <unistd.h>
#include <cassert>

using namespace std;

template <class T>
class RingQueue
{
public:
    RingQueue(const int size = 10)
        : _size(size)
    {
        // 初始化信號量以及數(shù)組大小
        // 生產(chǎn)者的信號量初始為數(shù)組大小
        // 消費(fèi)者的信號量初始為0
        _Rqueue.resize(_size);
        assert(sem_init(&_ps, 0, _size) == 0);
        assert(sem_init(&_cs, 0, 0) == 0);
    }

    void push(const T &in)
    {
        // 生產(chǎn)者申請信號量
        assert(sem_wait(&_ps) == 0);

        // 為了模擬環(huán)形,下標(biāo)需要模等于數(shù)組的大小
        // 保證下標(biāo)不越界
        _Rqueue[_Pindex++] = in;
        _Pindex %= _size;

        // 消費(fèi)者發(fā)布信號量
        assert(sem_post(&_cs) == 0);
    }

    void pop(T *out)
    {
        // 消費(fèi)者申請信號量
        assert(sem_wait(&_cs) == 0);

        // 為了模擬環(huán)形,下標(biāo)需要模等于數(shù)組的大小
        // 保證下標(biāo)不越界
        *out = _Rqueue[_Cindex++];
        _Cindex %= _size;

        // 生產(chǎn)者發(fā)布信號量
        assert(sem_post(&_ps) == 0);
    }

    ~RingQueue()
    {
        sem_destroy(&_ps);
        sem_destroy(&_cs);
    }

private:
    vector<T> _Rqueue;
    int _size;       // 數(shù)組的容量大小 
    sem_t _ps;       // 生產(chǎn)者的信號量
    sem_t _cs;       // 消費(fèi)者的信號量
    int _Pindex = 0; // 生產(chǎn)者的下標(biāo)
    int _Cindex = 0; // 消費(fèi)者的下標(biāo)
};

main.cc

#include "RingQueue.hpp"
#include "Task.hpp"

void *Pplay(void *rp)
{
    RingQueue<CPTask> *rq = (RingQueue<CPTask> *)rp;

    while (1)
    {
        string s("+-*/");
        // 隨機(jī)產(chǎn)生數(shù)據(jù)插入
        int x = rand() % 100 + 1;
        int y = rand() % 100 + 1;
        // 隨機(jī)提取+-*/
        int i = rand() % s.size();
        // 定義好實(shí)現(xiàn)類的對象
        CPTask c(x, y, s[i], Math);

        // 將整個(gè)對象插入到計(jì)算隊(duì)列中
        rq->push(c);
        cout << "生產(chǎn)數(shù)據(jù)完成: " << c.tostring() << endl;
        sleep(1);
    }
}

void *Cplay(void *cp)
{
    RingQueue<CPTask> *rq = (RingQueue<CPTask> *)cp;

    while (1)
    {
        // 隊(duì)列拿出數(shù)據(jù)
        CPTask c;
        rq->pop(&c);
        // 調(diào)用拿出的對象獲取最終的結(jié)果
        string s = c();

        cout << "消費(fèi)完成,取出的數(shù)據(jù)為: " << s << endl;
        sleep(2);
    }
}

int main()
{
    srand(time(nullptr));

    RingQueue<CPTask> *rq = new RingQueue<CPTask>();

    pthread_t p, c;
    pthread_create(&p, nullptr, Pplay, (void *)rq);
    pthread_create(&c, nullptr, Cplay, (void *)rq);

    pthread_join(p, nullptr);
    pthread_join(c, nullptr);

    delete rq;

    return 0;
}

效果

image-20230717171143701

對于多生產(chǎn)多消費(fèi)的情況

如果現(xiàn)在有多個(gè)生產(chǎn)和多個(gè)消費(fèi),那么就要保證臨界資源的安全,這時(shí)候就得加鎖。

加鎖可以實(shí)現(xiàn)在申請信號量之后,因?yàn)樯暾埿盘柫繉?shí)際上就是一個(gè)原子性的操作,并不會(huì)因?yàn)槎嗑€程而導(dǎo)致沖突。當(dāng)線程申請好了各自的信號量之后再往下運(yùn)行加鎖,就不用讓所有線程都等待在加鎖前。而解鎖同樣可以在發(fā)布信號量之后。

原文鏈接:https://blog.csdn.net/CHJBL/article/details/134411783

  • 上一篇:沒有了
  • 下一篇:沒有了
欄目分類
最近更新