網站首頁 編程語言 正文
基于循環數組的無鎖隊列
- 類接口和變量
- CAS的使用
- enqueue入隊列
- dequeue出隊列
- 在多于一個生產者線程的情況下“讓出”CPU的必要性
- 源碼
- 源碼測試
- 總結
- 后言
類接口和變量
#ifndef _ARRAYLOCKFREEQUEUE_H___
#define _ARRAYLOCKFREEQUEUE_H___
#include <stdint.h>
#ifdef _WIN64
#define QUEUE_INT int64_t
#else
#define QUEUE_INT unsigned long
#endif
#define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65535 // 2^16
template <typename ELEM_T, QUEUE_INT Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE>
class ArrayLockFreeQueue
{
public:
ArrayLockFreeQueue();
virtual ~ArrayLockFreeQueue();
QUEUE_INT size();
bool enqueue(const ELEM_T &a_data);//入隊
bool dequeue(ELEM_T &a_data);// 出隊
bool try_dequeue(ELEM_T &a_data);//嘗試入隊
private:
ELEM_T m_thequeue[Q_SIZE];
volatile QUEUE_INT m_count;//隊列的元素個數
volatile QUEUE_INT m_writeIndex;//新元素入隊時存放位置在數組中的下標
volatile QUEUE_INT m_readIndex;//下一個出隊元素在數組中的下標
volatile QUEUE_INT m_maximumReadIndex;// 最后一個已經完成入隊操作的元素在數組中的下標
inline QUEUE_INT countToIndex(QUEUE_INT a_count);
};
#include "ArrayLockFreeQueueImp.h"
#endif
m_maximumReadIndex: 最后一個已經完成入列操作的元素在數組中的下標。如果它的值跟m_writeIndex不一致,表明有寫請求尚未完成。這意味著,有寫請求成功申請了空間但數據還沒完全寫進隊列。所以如果有線程要讀取,必須要等到寫線程將數據完全寫入到隊列之后。
必須指明的是使用3種不同的下標都是必須的,因為隊列允許任意數量的生產者和消費者圍繞著它工作。
數組環形圖:
CAS的使用
使用gcc內置的syn_bool_compare_and_swap,但重新做了宏定義封裝。
#ifndef _ATOM_OPT_H___
#define _ATOM_OPT_H___
#ifdef __GNUC__
#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)
#define AtomicAdd(a_ptr,a_count) __sync_fetch_and_add (a_ptr, a_count)
#define AtomicSub(a_ptr,a_count) __sync_fetch_and_sub (a_ptr, a_count)
#include <sched.h> // sched_yield()
#else
#include <Windows.h>
#ifdef _WIN64
#define CAS(a_ptr, a_oldVal, a_newVal) (a_oldVal == InterlockedCompareExchange64(a_ptr, a_newVal, a_oldVal))
#define sched_yield() SwitchToThread()
#define AtomicAdd(a_ptr, num) InterlockedIncrement64(a_ptr)
#define AtomicSub(a_ptr, num) InterlockedDecrement64(a_ptr)
#else
#define CAS(a_ptr, a_oldVal, a_newVal) (a_oldVal == InterlockedCompareExchange(a_ptr, a_newVal, a_oldVal))
#define sched_yield() SwitchToThread()
#define AtomicAdd(a_ptr, num) InterlockedIncrement(a_ptr)
#define AtomicSub(a_ptr, num) InterlockedDecrement(a_ptr)
#endif
#endif
#endif
enqueue入隊列
template <typename ELEM_T, QUEUE_INT Q_SIZE>
inline QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(QUEUE_INT a_count)
{
return (a_count % Q_SIZE); // 取余的時候
}
template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data)
{
QUEUE_INT currentWriteIndex; // 獲取寫指針的位置
QUEUE_INT currentReadIndex;
// 1. 獲取可寫入的位置
do
{
currentWriteIndex = m_writeIndex;
currentReadIndex = m_readIndex;
if(countToIndex(currentWriteIndex + 1) ==
countToIndex(currentReadIndex))
{
return false; // 隊列已經滿了
}
// 目的是為了獲取一個能寫入的位置
} while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1)));
// 獲取寫入位置后 currentWriteIndex 是一個臨時變量,保存我們寫入的位置
// We know now that this index is reserved for us. Use it to save the data
m_thequeue[countToIndex(currentWriteIndex)] = a_data; // 把數據更新到對應的位置
// 2. 更新可讀的位置,按著m_maximumReadIndex+1的操作
// update the maximum read index after saving the data. It wouldn't fail if there is only one thread
// inserting in the queue. It might fail if there are more than 1 producer threads because this
// operation has to be done in the same order as the previous CAS
while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
{
// this is a good place to yield the thread in case there are more
// software threads than hardware processors and you have more
// than 1 producer thread
// have a look at sched_yield (POSIX.1b)
sched_yield(); // 當線程超過cpu核數的時候如果不讓出cpu導致一直循環在此。
}
AtomicAdd(&m_count, 1);
return true;
}
分析:
(1)對于下圖,隊列中存放了兩個元素。WriteIndex指示的位置是新元素將會被插入的位置。ReadIndex指向的位置中的元素將會在下一次pop操作中被彈出。
(2)當生產者準備將數據插入到隊列中,它首先通過增加WriteIndex的值來申請空間。MaximumReadIndex指向最后一個存放有效數據的位置(也就是實際的隊列尾)。
(3)一旦空間的申請完成,生產者就可以將數據拷貝到剛剛申請到的位置中。完成之后增加MaximumReadIndex使得它與WriteIndex的一致。
(4)現在隊列中有3個元素,接著又有一個生產者嘗試向隊列中插入元素。
(5)在第一個生產者完成數據拷貝之前,又有另外一個生產者申請了一個新的空間準備拷貝數據。現在有兩個生產者同時向隊列插入數據。
(6)現在生產者開始拷貝數據,在完成拷貝之后,對MaximumReadIndex的遞增操作必須嚴格遵循一個順序:第一個生產者線程首先遞增MaximumReadIndex,接著才輪到第二個生產者。 這個順序必須被嚴格遵守的原因是,我們必須保證數據被完全拷貝到隊列之后才允許消費者線程將其出列。
第一個生產者完成了數據拷貝,并對MaximumReadIndex完成了遞增。
(7)現在第二個生產者可以遞增MaximumReadIndex了;第二個生產者完成了對MaximumReadIndex的遞增,現在隊列中有5個元素。
dequeue出隊列
template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data)
{
QUEUE_INT currentMaximumReadIndex;
QUEUE_INT currentReadIndex;
do
{
// to ensure thread-safety when there is more than 1 producer thread
// a second index is defined (m_maximumReadIndex)
currentReadIndex = m_readIndex;
currentMaximumReadIndex = m_maximumReadIndex;
if(countToIndex(currentReadIndex) ==
countToIndex(currentMaximumReadIndex)) // 如果不為空,獲取到讀索引的位置
{
// the queue is empty or
// a producer thread has allocate space in the queue but is
// waiting to commit the data into it
return false;
}
// retrieve the data from the queue
a_data = m_thequeue[countToIndex(currentReadIndex)]; // 從臨時位置讀取的
// try to perfrom now the CAS operation on the read index. If we succeed
// a_data already contains what m_readIndex pointed to before we
// increased it
if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
{
AtomicSub(&m_count, 1); // 真正讀取到了數據,元素-1
return true;
}
} while(true);
assert(0);
// Add this return statement to avoid compiler warnings
return false;
}
分析:
(1)以下插圖展示了元素出列的時候各種下標是如何變化的,隊列中初始有2個元素。WriteIndex指示的位置是新元素將會被插入的位置。ReadIndex指向的位置中的元素將會在下一次pop操作中被彈出。
(2)消費者線程拷貝數組ReadIndex位置的元素,然后嘗試用CAS操作將ReadIndex加1。如果操作成功消費者成功的將數據出列。因為CAS操作是原子的,所以只有唯一的線程可以在同一時刻更新ReadIndex的值。如果操作失敗,讀取新的ReadIndex值,以重復以上操作(copy數據,CAS)。
(3)現在又有一個消費者將元素出列,隊列變成空。
(4)現在有一個生產者正在向隊列中添加元素。它已經成功的申請了空間,但尚未完成數據拷貝。任何其它企圖從隊列中移除元素的消費者都會發現隊列非空(因為writeIndex不等于readIndex)。但它不能讀取readIndex所指向位置中的數據,因為readIndex與MaximumReadIndex相等。消費者將會在do循環中不斷的反復嘗試,直到生產者完成數據拷貝增加MaximumReadIndex的值,或者隊列變成空(這在多個消費者的場景下會發生)。
(5)當生產者完成數據拷貝,隊列的大小是1,消費者線程可以讀取這個數據了。
在多于一個生產者線程的情況下“讓出”CPU的必要性
enqueue函數中使用了sched_yiedld()來主動的讓出CPU,對于一個無鎖的算法而言,這個調用看起來有點奇怪。
多線程環境下影響性能的其中一個因素就是Cache損壞。而產生Cache損壞的一種情況就是一個線程被搶占,操作系統需要保存被搶占線程的上下文,然后將被選中作為下一個調度線程的上下文載入。此時Cache中緩存的數據都會失效,因為它是被搶占線程的數據而不是新線程的數據。
所以,當此算法調用sched_yield()意味著告訴操作系統:“我要把處理器時間讓給其它線程,因為我要等待某件事情的發生”。無鎖算法和通過阻塞機制同步的算法的一個主要區別在于無鎖算法不會阻塞在線程同步上。
那么為什么在這里我們要主動請求操作系統搶占自己呢? 它與有多少個生產者線程在并發的往隊列中存放數據有關:每個生產者線程所執行的CAS操作都必須嚴格遵循FIFO次序,一個用于申請空間,另一個用于通知消費者數據已經寫入完成可以被讀取了。
如果應用程序只有唯一的生產者操作這個隊列,sche_yield()將永遠沒有機會被調用,第2個CAS操作永遠不會失敗。因為在一個生產者的情況下沒有人能破壞生產者執行這兩個CAS操作的FIFO順序。
而當多于一個生產者線程往隊列中存放數據的時候,問題就出現了。概括來說,一個生產者通過第1個
CAS操作申請空間,然后將數據寫入到申請到的空間中,然后執行第2個CAS操作通知消費者數據準備完畢可供讀取了。這第2個CAS操作必須遵循FIFO順序,也就是說,如果A線程第首先執行完第一個CAS操作,那么它也要第1個執行完第2個CAS操作,如果A線程在執行完第一個CAS操作之后停止,然后B線程執行完第1個CAS操作,那么B線程將無法完成第2個CAS操作,因為它要等待A先完成第2個CAS操作。而這就是問題產生的根源。
源碼
#ifndef _ARRAYLOCKFREEQUEUEIMP_H___
#define _ARRAYLOCKFREEQUEUEIMP_H___
#include "ArrayLockFreeQueue.h"
#include <assert.h>
#include "atom_opt.h"
template <typename ELEM_T, QUEUE_INT Q_SIZE>
ArrayLockFreeQueue<ELEM_T, Q_SIZE>::ArrayLockFreeQueue() :
m_writeIndex(0),
m_readIndex(0),
m_maximumReadIndex(0)
{
m_count = 0;
}
template <typename ELEM_T, QUEUE_INT Q_SIZE>
ArrayLockFreeQueue<ELEM_T, Q_SIZE>::~ArrayLockFreeQueue()
{
}
template <typename ELEM_T, QUEUE_INT Q_SIZE>
inline QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(QUEUE_INT a_count)
{
return (a_count % Q_SIZE); // 取余的時候
}
template <typename ELEM_T, QUEUE_INT Q_SIZE>
QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::size()
{
QUEUE_INT currentWriteIndex = m_writeIndex;
QUEUE_INT currentReadIndex = m_readIndex;
if(currentWriteIndex>=currentReadIndex)
return currentWriteIndex - currentReadIndex;
else
return Q_SIZE + currentWriteIndex - currentReadIndex;
}
template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data)
{
QUEUE_INT currentWriteIndex; // 獲取寫指針的位置
QUEUE_INT currentReadIndex;
// 1. 獲取可寫入的位置
do
{
currentWriteIndex = m_writeIndex;
currentReadIndex = m_readIndex;
if(countToIndex(currentWriteIndex + 1) ==
countToIndex(currentReadIndex))
{
return false; // 隊列已經滿了
}
// 目的是為了獲取一個能寫入的位置
} while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1)));
// 獲取寫入位置后 currentWriteIndex 是一個臨時變量,保存我們寫入的位置
// We know now that this index is reserved for us. Use it to save the data
m_thequeue[countToIndex(currentWriteIndex)] = a_data; // 把數據更新到對應的位置
// 2. 更新可讀的位置,按著m_maximumReadIndex+1的操作
// update the maximum read index after saving the data. It wouldn't fail if there is only one thread
// inserting in the queue. It might fail if there are more than 1 producer threads because this
// operation has to be done in the same order as the previous CAS
while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
{
// this is a good place to yield the thread in case there are more
// software threads than hardware processors and you have more
// than 1 producer thread
// have a look at sched_yield (POSIX.1b)
sched_yield(); // 當線程超過cpu核數的時候如果不讓出cpu導致一直循環在此。
}
AtomicAdd(&m_count, 1);
return true;
}
template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::try_dequeue(ELEM_T &a_data)
{
return dequeue(a_data);
}
template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data)
{
QUEUE_INT currentMaximumReadIndex;
QUEUE_INT currentReadIndex;
do
{
// to ensure thread-safety when there is more than 1 producer thread
// a second index is defined (m_maximumReadIndex)
currentReadIndex = m_readIndex;
currentMaximumReadIndex = m_maximumReadIndex;
if(countToIndex(currentReadIndex) ==
countToIndex(currentMaximumReadIndex)) // 如果不為空,獲取到讀索引的位置
{
// the queue is empty or
// a producer thread has allocate space in the queue but is
// waiting to commit the data into it
return false;
}
// retrieve the data from the queue
a_data = m_thequeue[countToIndex(currentReadIndex)]; // 從臨時位置讀取的
// try to perfrom now the CAS operation on the read index. If we succeed
// a_data already contains what m_readIndex pointed to before we
// increased it
if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
{
AtomicSub(&m_count, 1); // 真正讀取到了數據,元素-1
return true;
}
} while(true);
assert(0);
// Add this return statement to avoid compiler warnings
return false;
}
#endif
源碼測試
#include "ArrayLockFreeQueue.h"
ArrayLockFreeQueue<int> arraylockfreequeue;
void *arraylockfreequeue_producer_thread(void *argv)
{
PRINT_THREAD_INTO();
int count = 0;
int write_failed_count = 0;
for (int i = 0; i < s_queue_item_num;)
{
if (arraylockfreequeue.enqueue(count)) // enqueue的順序是無法保證的,我們只能計算enqueue的個數
{
count = lxx_atomic_add(&s_count_push, 1);
i++;
}
else
{
write_failed_count++;
// printf("%s %lu enqueue failed, q:%d\n", __FUNCTION__, pthread_self(), arraylockfreequeue.size());
sched_yield();
// usleep(10000);
}
}
// printf("%s %lu write_failed_count:%d\n", __FUNCTION__, pthread_self(), write_failed_count)
PRINT_THREAD_LEAVE();
return NULL;
}
void *arraylockfreequeue_consumer_thread(void *argv)
{
int last_value = 0;
PRINT_THREAD_INTO();
int value = 0;
int read_failed_count = 0;
while (true)
{
if (arraylockfreequeue.dequeue(value))
{
if (s_consumer_thread_num == 1 && s_producer_thread_num == 1 && (last_value + 1) != value) // 只有一入一出的情況下才有對比意義
{
// printf("pid:%lu, -> value:%d, expected:%d\n", pthread_self(), value, last_value);
}
lxx_atomic_add(&s_count_pop, 1);
last_value = value;
}
else
{
read_failed_count++;
// printf("%s %lu no data, s_count_pop:%d, value:%d\n", __FUNCTION__, pthread_self(), s_count_pop, value);
// usleep(100);
sched_yield();
}
if (s_count_pop >= s_queue_item_num * s_producer_thread_num)
{
// printf("%s dequeue:%d, s_count_pop:%d, %d, %d\n", __FUNCTION__, last_value, s_count_pop, s_queue_item_num, s_consumer_thread_num);
break;
}
}
// printf("%s %lu read_failed_count:%d\n", __FUNCTION__, pthread_self(), read_failed_count)
PRINT_THREAD_LEAVE();
return NULL;
}
總結
基于循環數組的無鎖隊列ArrayLockFreeQueue相實現對簡單。
無鎖消息隊列適用于10w+每秒的數據吞吐以及數據操作耗時較少場景。
currentMaximumReadIndex表示其之前的數據可以讀取,本身所在的位置不可讀取。
原文鏈接:https://blog.csdn.net/Long_xu/article/details/127026178
相關推薦
- 2021-12-07 詳解C語言編程之thread多線程_C 語言
- 2022-05-11 SpringBoot整合RabbitMq與高級特性
- 2022-12-10 c語言如何設置隨機數及逐行解析_C 語言
- 2022-10-05 WPF實現好看的Loading動畫的示例代碼_C#教程
- 2022-12-24 ReactNative支付密碼輸入框實現詳解_React
- 2022-05-27 numpy模塊中axis的理解與使用_python
- 2022-05-28 利用For循環遍歷Python字典的三種方法實例_python
- 2021-12-11 由redux到react-redux再到rematch
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支