日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

C++線程安全容器stack和queue的使用詳細介紹_C 語言

作者:TTOR ? 更新時間: 2022-10-19 編程語言

要構建線程安全的數據結構, 關注幾點:

  • 若某線程破壞了數據結構的不變量, 保證其他線程不能看到
  • 提供的操作應該完整,獨立, 而非零散的分解步驟避免函數接口固有的條件競爭(比如之前提到的empty和top和pop)

線程安全的容器棧threadsafe_stack

入門(3)里曾介紹過線程安全的stack容器, 這里把代碼搬過來再分析

逐項分析, 該代碼如何實現線程安全的

template<typename T>
class threadsafe_stack
{
private:
    stack<T> data;
    mutable mutex m;
public:
    threadsafe_stack(){}
    threadsafe_stack(const threadsafe_stack &other)
    {
        lock_guard lock1(other.m);
        data=other.data;
    }
    threadsafe_stack &operator=(const threadsafe_stack &) = delete;
    void push(T new_value)
    {
        lock_guard lock1(m);
        data.push(move(new_value));      //1
    }
    shared_ptr<T> pop()
    {
        lock_guard lock1(m);
        if (data.empty())
        {
            throw empty_stack();        //2
        }
        shared_ptr<T> const
                res(make_shared<T>(move(data.top()))); //3
        data.pop();                                     //4
        return res;
    }
    void pop(T &value)
    {
        lock_guard lock1(m);
        if (data.empty())
        {
            throw empty_stack();
        }
        value = move(data.top());    //5
        data.pop();                 //6
    }
    bool empty() const //7
    {
        lock_guard lock1(m);
        return data.empty();
    }
};

首先, 每個操作都對互斥加鎖, 保證基本線程安全

其次, 在多線程下, 對于std::stack容器, empty(), top(), pop()存在接口上的數據競爭(見入門(3)說明), 于是threadsafe_stack把這些調用集合到一個函數pop()里, 以實現線程安全. 其中pop()函數里若與遇??? 直接拋出異常

接著分析:

1處data.push()可能拋出異常: 原因是復制/移動時拋出異常或stack容器擴展容量時遇上內存分配不足, 但無論哪種, std::stack<>能保證自身的安全

2處拋出的異常: 沒有改動數據, 安全的拋出行為

3處共享指針的創建可能拋出異常: 內存不足或移動/復制相關的構造函數拋出異常,但兩種情形c++都能保證不會出現內存泄漏, 并且此時數據還未改動(data.pop()時才改動),

4處data.pop()的實質操作是返回結果, 絕不會拋出異常,結合3, 所以這是異常安全的重載函數pop()

5,6處和3,4處類似, 不同之處是沒用創建新共享指針, 但此時數據也沒被改動, 也是安全的重載函數pop()

最后7處empty()不改動任何數據, 是異常安全的函數

從內存和數據結構安全方面來說沒用問題

然而,這段代碼可能造成死鎖:

因為在持鎖期間, 有可能執行以下用戶自定義函數:

用戶自定義的復制構造函數(1 3處的res構造), 移動構造函數(3處的make_share), 拷貝賦值操作和移動賦值操作(5處), 用戶也可能重載了new和delete.

當在這些函數里, 若是再次調用了同個棧的相關函數, 會再次申請獲取鎖, 然而之前的鎖還沒釋放, 因此造成死鎖

以下是我想到的一種死鎖方式(正常情況應該不會這么寫, 但是設計時必須要考慮)

class A;
threadsafe_stack<A> s;
class A
{
public:
    A(A&& a)//2->然后這里使用s.pop(),之前鎖沒釋放, 造成了死鎖
    {
        s.pop();
    }
    A(){}
};
int main()
{
    s.push(A()); //1->臨時對象A()在s.push()里被move進內置data時, 會調用A的移動構造函數
    return 0;
}

向棧添加/移除數據, 不可能不涉及復制行為或內存行為, 于是只能對棧的使用者提出要求: 讓使用者來保證避免死鎖

棧的各成員函數都有lock_guard保護數據, 因此同時調用的線程沒有數量限制.

僅有構造函數和析構函數不是安全行為, 但無論是沒構造完成還是銷毀到一半, 從而轉去調用成員函數, 這在有無并發情況下都是不正確的.

所以, 使用者必須保證: 棧容器未構造完成時不能訪問數據, 只有全部線程都停止訪問時, 才可銷毀容器

線程安全的容器隊列threadsafe_queue

自定義一個threadsafe_queue, 并且上面對于線程安全的大多數分析在這也成立

template<typename T>
class threadsafe_queue
{
private:
    queue<T> data;
    mutable mutex m;
    condition_variable condition;
public:
    threadsafe_queue()
    {}
    threadsafe_queue(const threadsafe_queue &other)
    {
        lock_guard lock1(other.m);
        data = other.data;
    }
    threadsafe_queue &operator=(const threadsafe_queue &) = delete;
    void push(T new_value)
    {
        lock_guard lock1(m);
        data.push(move(new_value));
        condition.notify_one();   //1
    }
    void wait_and_pop(T &value)      //2
    {
        lock_guard lock1(m);
        condition.wait(lock1, [this]
        {
            return !data.empty();
        });
        value = move(data.top());
        data.pop();
    }
    shared_ptr<T> wait_and_pop()       //3
    {
        lock_guard lock1(m);
        condition.wait(lock1, [this]
        {
            return !data.empty();
        });
        shared_ptr<T> const
                res(make_shared<T>(move(data.top()))); //4 創建shared_ptr可能出現異常
        data.pop();
        return res;
    }
    shared_ptr<T> try_pop()
    {
        lock_guard lock1(m);
        if (data.empty())
        {
            return shared_ptr<T>();     //5
        }
        shared_ptr<T> const
                res(make_shared<T>(move(data.top())));
        data.pop();
        return res;
    }
    bool try_pop(T &value)
    {
        lock_guard lock1(m);
        if (data.empty())
        {
            return false;
        }
        value = move(data.top());
        data.pop();
    }
    bool empty() const
    {
        lock_guard lock1(m);
        return data.empty();
    }
};

區別:

發現隊列通常用于消費者/生產者模型, 因此實現阻塞的取值函數wait_and_pop, 即當調用時隊列若空, 阻塞等待, 直到push數據后調用condition.notify_one()

同時也提供了非阻塞的取值函數try_pop

然而這一實現會有問題:

假如有多個線程同時等待, condition.notify_one()只能喚醒其中一個,若該喚醒的線程執行wait_and_pop之后的代碼拋出異常(例如4處res的創建), 此時隊列里還有數據,卻不會有其他任何線程被喚

如果我們因不能接受這種行為方式, 而只是簡單的把notify_one改為notify_all,這樣每次push數據后都會喚醒所有的等待線程. 由于只push了1個數據, 大多數線程醒來后發現隊列還是為空, 還得繼續等待, 這將大大增加開銷

第二種解決種方法是若wait_and_pop拋出異常則再次調用notify_one

第三種方法是讓std::queue存儲share_ptr<T>, share_ptr的初始化移動到push的調用處, 從內部復制shared_ptr<>實例則不會拋出異常

這里采用第三種方法, 還會有額外的好處: push里為shared_ptr分配內存操作在加鎖之前, 縮短了互斥加鎖的時間, 由于分配內存通常是耗時的操作, 因此這樣非常有利于增強性能

template<typename T>
class threadsafe_queue
{
private:
    queue<shared_ptr<T>> data;
    mutable mutex m;
    condition_variable condition;
public:
    threadsafe_queue()
    {}
    threadsafe_queue(const threadsafe_queue &other)
    {
        lock_guard lock1(other.m);
        data = other.data;
    }
    threadsafe_queue &operator=(const threadsafe_queue &) = delete;
    void push(T new_value)
    {
        //分配內存在加鎖操作之前
        shared_ptr<T> value(make_shared<T>(move(new_value)));
        lock_guard lock1(m);
        data.push(value);
        condition.notify_one();
    }
    void wait_and_pop(T &value)
    {
        lock_guard lock1(m);
        condition.wait(lock1, [this]
        {
            return !data.empty();   //隊列空則等待
        });
        value = move(*data.front()); //先取值, 再存入參數value
        data.pop();
    }
    bool try_pop(T &value)
    {
        lock_guard lock1(m);
        if (data.empty())
        {
            return false;       //隊列空返回false
        }
        value = move(*data.front()); //先取值, 再存入參數value
        data.pop();
        return true;
    }
    shared_ptr<T> wait_and_pop()
    {
        lock_guard lock1(m);
        condition.wait(lock1, [this]
        {
            return !data.empty();    //隊列空則等待
        });
        shared_ptr<T> res = data.front(); //取出結果返回給外部
        data.pop();
        return res;
    }
    shared_ptr<T> try_pop()
    {
        lock_guard lock1(m);
        if (data.empty())
        {
            return shared_ptr<T>();     //隊列空返回空shared_ptr
        }
        shared_ptr<T> res = data.front();//取出結果返回給外部
        data.pop();
        return res;
    }
    bool empty() const
    {
        lock_guard lock1(m);
        return data.empty();
    }
};

原文鏈接:https://blog.csdn.net/TOPEE362/article/details/126222582

欄目分類
最近更新