網(wǎng)站首頁 編程語言 正文
基于循環(huán)數(shù)組的無鎖隊(duì)列
- 類接口和變量
- CAS的使用
- enqueue入隊(duì)列
- dequeue出隊(duì)列
- 在多于一個(gè)生產(chǎn)者線程的情況下“讓出”CPU的必要性
- 源碼
- 源碼測試
- 總結(jié)
- 后言
類接口和變量
#ifndef _ARRAYLOCKFREEQUEUE_H___
#define _ARRAYLOCKFREEQUEUE_H___
#include <stdint.h>
#ifdef _WIN64
#define QUEUE_INT int64_t
#else
#define QUEUE_INT unsigned long
#endif
#define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65535 // 2^16
template <typename ELEM_T, QUEUE_INT Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE>
class ArrayLockFreeQueue
{
public:
ArrayLockFreeQueue();
virtual ~ArrayLockFreeQueue();
QUEUE_INT size();
bool enqueue(const ELEM_T &a_data);//入隊(duì)
bool dequeue(ELEM_T &a_data);// 出隊(duì)
bool try_dequeue(ELEM_T &a_data);//嘗試入隊(duì)
private:
ELEM_T m_thequeue[Q_SIZE];
volatile QUEUE_INT m_count;//隊(duì)列的元素個(gè)數(shù)
volatile QUEUE_INT m_writeIndex;//新元素入隊(duì)時(shí)存放位置在數(shù)組中的下標(biāo)
volatile QUEUE_INT m_readIndex;//下一個(gè)出隊(duì)元素在數(shù)組中的下標(biāo)
volatile QUEUE_INT m_maximumReadIndex;// 最后一個(gè)已經(jīng)完成入隊(duì)操作的元素在數(shù)組中的下標(biāo)
inline QUEUE_INT countToIndex(QUEUE_INT a_count);
};
#include "ArrayLockFreeQueueImp.h"
#endif
m_maximumReadIndex: 最后一個(gè)已經(jīng)完成入列操作的元素在數(shù)組中的下標(biāo)。如果它的值跟m_writeIndex不一致,表明有寫請(qǐng)求尚未完成。這意味著,有寫請(qǐng)求成功申請(qǐng)了空間但數(shù)據(jù)還沒完全寫進(jìn)隊(duì)列。所以如果有線程要讀取,必須要等到寫線程將數(shù)據(jù)完全寫入到隊(duì)列之后。
必須指明的是使用3種不同的下標(biāo)都是必須的,因?yàn)殛?duì)列允許任意數(shù)量的生產(chǎn)者和消費(fèi)者圍繞著它工作。
數(shù)組環(huán)形圖:
CAS的使用
使用gcc內(nèi)置的syn_bool_compare_and_swap,但重新做了宏定義封裝。
#ifndef _ATOM_OPT_H___
#define _ATOM_OPT_H___
#ifdef __GNUC__
#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)
#define AtomicAdd(a_ptr,a_count) __sync_fetch_and_add (a_ptr, a_count)
#define AtomicSub(a_ptr,a_count) __sync_fetch_and_sub (a_ptr, a_count)
#include <sched.h> // sched_yield()
#else
#include <Windows.h>
#ifdef _WIN64
#define CAS(a_ptr, a_oldVal, a_newVal) (a_oldVal == InterlockedCompareExchange64(a_ptr, a_newVal, a_oldVal))
#define sched_yield() SwitchToThread()
#define AtomicAdd(a_ptr, num) InterlockedIncrement64(a_ptr)
#define AtomicSub(a_ptr, num) InterlockedDecrement64(a_ptr)
#else
#define CAS(a_ptr, a_oldVal, a_newVal) (a_oldVal == InterlockedCompareExchange(a_ptr, a_newVal, a_oldVal))
#define sched_yield() SwitchToThread()
#define AtomicAdd(a_ptr, num) InterlockedIncrement(a_ptr)
#define AtomicSub(a_ptr, num) InterlockedDecrement(a_ptr)
#endif
#endif
#endif
enqueue入隊(duì)列
template <typename ELEM_T, QUEUE_INT Q_SIZE>
inline QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(QUEUE_INT a_count)
{
return (a_count % Q_SIZE); // 取余的時(shí)候
}
template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data)
{
QUEUE_INT currentWriteIndex; // 獲取寫指針的位置
QUEUE_INT currentReadIndex;
// 1. 獲取可寫入的位置
do
{
currentWriteIndex = m_writeIndex;
currentReadIndex = m_readIndex;
if(countToIndex(currentWriteIndex + 1) ==
countToIndex(currentReadIndex))
{
return false; // 隊(duì)列已經(jīng)滿了
}
// 目的是為了獲取一個(gè)能寫入的位置
} while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1)));
// 獲取寫入位置后 currentWriteIndex 是一個(gè)臨時(shí)變量,保存我們寫入的位置
// We know now that this index is reserved for us. Use it to save the data
m_thequeue[countToIndex(currentWriteIndex)] = a_data; // 把數(shù)據(jù)更新到對(duì)應(yīng)的位置
// 2. 更新可讀的位置,按著m_maximumReadIndex+1的操作
// update the maximum read index after saving the data. It wouldn't fail if there is only one thread
// inserting in the queue. It might fail if there are more than 1 producer threads because this
// operation has to be done in the same order as the previous CAS
while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
{
// this is a good place to yield the thread in case there are more
// software threads than hardware processors and you have more
// than 1 producer thread
// have a look at sched_yield (POSIX.1b)
sched_yield(); // 當(dāng)線程超過cpu核數(shù)的時(shí)候如果不讓出cpu導(dǎo)致一直循環(huán)在此。
}
AtomicAdd(&m_count, 1);
return true;
}
分析:
(1)對(duì)于下圖,隊(duì)列中存放了兩個(gè)元素。WriteIndex指示的位置是新元素將會(huì)被插入的位置。ReadIndex指向的位置中的元素將會(huì)在下一次pop操作中被彈出。
(2)當(dāng)生產(chǎn)者準(zhǔn)備將數(shù)據(jù)插入到隊(duì)列中,它首先通過增加WriteIndex的值來申請(qǐng)空間。MaximumReadIndex指向最后一個(gè)存放有效數(shù)據(jù)的位置(也就是實(shí)際的隊(duì)列尾)。
(3)一旦空間的申請(qǐng)完成,生產(chǎn)者就可以將數(shù)據(jù)拷貝到剛剛申請(qǐng)到的位置中。完成之后增加MaximumReadIndex使得它與WriteIndex的一致。
(4)現(xiàn)在隊(duì)列中有3個(gè)元素,接著又有一個(gè)生產(chǎn)者嘗試向隊(duì)列中插入元素。
(5)在第一個(gè)生產(chǎn)者完成數(shù)據(jù)拷貝之前,又有另外一個(gè)生產(chǎn)者申請(qǐng)了一個(gè)新的空間準(zhǔn)備拷貝數(shù)據(jù)。現(xiàn)在有兩個(gè)生產(chǎn)者同時(shí)向隊(duì)列插入數(shù)據(jù)。
(6)現(xiàn)在生產(chǎn)者開始拷貝數(shù)據(jù),在完成拷貝之后,對(duì)MaximumReadIndex的遞增操作必須嚴(yán)格遵循一個(gè)順序:第一個(gè)生產(chǎn)者線程首先遞增MaximumReadIndex,接著才輪到第二個(gè)生產(chǎn)者。 這個(gè)順序必須被嚴(yán)格遵守的原因是,我們必須保證數(shù)據(jù)被完全拷貝到隊(duì)列之后才允許消費(fèi)者線程將其出列。
第一個(gè)生產(chǎn)者完成了數(shù)據(jù)拷貝,并對(duì)MaximumReadIndex完成了遞增。
(7)現(xiàn)在第二個(gè)生產(chǎn)者可以遞增MaximumReadIndex了;第二個(gè)生產(chǎn)者完成了對(duì)MaximumReadIndex的遞增,現(xiàn)在隊(duì)列中有5個(gè)元素。
dequeue出隊(duì)列
template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data)
{
QUEUE_INT currentMaximumReadIndex;
QUEUE_INT currentReadIndex;
do
{
// to ensure thread-safety when there is more than 1 producer thread
// a second index is defined (m_maximumReadIndex)
currentReadIndex = m_readIndex;
currentMaximumReadIndex = m_maximumReadIndex;
if(countToIndex(currentReadIndex) ==
countToIndex(currentMaximumReadIndex)) // 如果不為空,獲取到讀索引的位置
{
// the queue is empty or
// a producer thread has allocate space in the queue but is
// waiting to commit the data into it
return false;
}
// retrieve the data from the queue
a_data = m_thequeue[countToIndex(currentReadIndex)]; // 從臨時(shí)位置讀取的
// try to perfrom now the CAS operation on the read index. If we succeed
// a_data already contains what m_readIndex pointed to before we
// increased it
if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
{
AtomicSub(&m_count, 1); // 真正讀取到了數(shù)據(jù),元素-1
return true;
}
} while(true);
assert(0);
// Add this return statement to avoid compiler warnings
return false;
}
分析:
(1)以下插圖展示了元素出列的時(shí)候各種下標(biāo)是如何變化的,隊(duì)列中初始有2個(gè)元素。WriteIndex指示的位置是新元素將會(huì)被插入的位置。ReadIndex指向的位置中的元素將會(huì)在下一次pop操作中被彈出。
(2)消費(fèi)者線程拷貝數(shù)組ReadIndex位置的元素,然后嘗試用CAS操作將ReadIndex加1。如果操作成功消費(fèi)者成功的將數(shù)據(jù)出列。因?yàn)镃AS操作是原子的,所以只有唯一的線程可以在同一時(shí)刻更新ReadIndex的值。如果操作失敗,讀取新的ReadIndex值,以重復(fù)以上操作(copy數(shù)據(jù),CAS)。
(3)現(xiàn)在又有一個(gè)消費(fèi)者將元素出列,隊(duì)列變成空。
(4)現(xiàn)在有一個(gè)生產(chǎn)者正在向隊(duì)列中添加元素。它已經(jīng)成功的申請(qǐng)了空間,但尚未完成數(shù)據(jù)拷貝。任何其它企圖從隊(duì)列中移除元素的消費(fèi)者都會(huì)發(fā)現(xiàn)隊(duì)列非空(因?yàn)閣riteIndex不等于readIndex)。但它不能讀取readIndex所指向位置中的數(shù)據(jù),因?yàn)閞eadIndex與MaximumReadIndex相等。消費(fèi)者將會(huì)在do循環(huán)中不斷的反復(fù)嘗試,直到生產(chǎn)者完成數(shù)據(jù)拷貝增加MaximumReadIndex的值,或者隊(duì)列變成空(這在多個(gè)消費(fèi)者的場景下會(huì)發(fā)生)。
(5)當(dāng)生產(chǎn)者完成數(shù)據(jù)拷貝,隊(duì)列的大小是1,消費(fèi)者線程可以讀取這個(gè)數(shù)據(jù)了。
在多于一個(gè)生產(chǎn)者線程的情況下“讓出”CPU的必要性
enqueue函數(shù)中使用了sched_yiedld()來主動(dòng)的讓出CPU,對(duì)于一個(gè)無鎖的算法而言,這個(gè)調(diào)用看起來有點(diǎn)奇怪。
多線程環(huán)境下影響性能的其中一個(gè)因素就是Cache損壞。而產(chǎn)生Cache損壞的一種情況就是一個(gè)線程被搶占,操作系統(tǒng)需要保存被搶占線程的上下文,然后將被選中作為下一個(gè)調(diào)度線程的上下文載入。此時(shí)Cache中緩存的數(shù)據(jù)都會(huì)失效,因?yàn)樗潜粨屨季€程的數(shù)據(jù)而不是新線程的數(shù)據(jù)。
所以,當(dāng)此算法調(diào)用sched_yield()意味著告訴操作系統(tǒng):“我要把處理器時(shí)間讓給其它線程,因?yàn)槲乙却臣虑榈陌l(fā)生”。無鎖算法和通過阻塞機(jī)制同步的算法的一個(gè)主要區(qū)別在于無鎖算法不會(huì)阻塞在線程同步上。
那么為什么在這里我們要主動(dòng)請(qǐng)求操作系統(tǒng)搶占自己呢? 它與有多少個(gè)生產(chǎn)者線程在并發(fā)的往隊(duì)列中存放數(shù)據(jù)有關(guān):每個(gè)生產(chǎn)者線程所執(zhí)行的CAS操作都必須嚴(yán)格遵循FIFO次序,一個(gè)用于申請(qǐng)空間,另一個(gè)用于通知消費(fèi)者數(shù)據(jù)已經(jīng)寫入完成可以被讀取了。
如果應(yīng)用程序只有唯一的生產(chǎn)者操作這個(gè)隊(duì)列,sche_yield()將永遠(yuǎn)沒有機(jī)會(huì)被調(diào)用,第2個(gè)CAS操作永遠(yuǎn)不會(huì)失敗。因?yàn)樵谝粋€(gè)生產(chǎn)者的情況下沒有人能破壞生產(chǎn)者執(zhí)行這兩個(gè)CAS操作的FIFO順序。
而當(dāng)多于一個(gè)生產(chǎn)者線程往隊(duì)列中存放數(shù)據(jù)的時(shí)候,問題就出現(xiàn)了。概括來說,一個(gè)生產(chǎn)者通過第1個(gè)
CAS操作申請(qǐng)空間,然后將數(shù)據(jù)寫入到申請(qǐng)到的空間中,然后執(zhí)行第2個(gè)CAS操作通知消費(fèi)者數(shù)據(jù)準(zhǔn)備完畢可供讀取了。這第2個(gè)CAS操作必須遵循FIFO順序,也就是說,如果A線程第首先執(zhí)行完第一個(gè)CAS操作,那么它也要第1個(gè)執(zhí)行完第2個(gè)CAS操作,如果A線程在執(zhí)行完第一個(gè)CAS操作之后停止,然后B線程執(zhí)行完第1個(gè)CAS操作,那么B線程將無法完成第2個(gè)CAS操作,因?yàn)樗却鼳先完成第2個(gè)CAS操作。而這就是問題產(chǎn)生的根源。
源碼
#ifndef _ARRAYLOCKFREEQUEUEIMP_H___
#define _ARRAYLOCKFREEQUEUEIMP_H___
#include "ArrayLockFreeQueue.h"
#include <assert.h>
#include "atom_opt.h"
template <typename ELEM_T, QUEUE_INT Q_SIZE>
ArrayLockFreeQueue<ELEM_T, Q_SIZE>::ArrayLockFreeQueue() :
m_writeIndex(0),
m_readIndex(0),
m_maximumReadIndex(0)
{
m_count = 0;
}
template <typename ELEM_T, QUEUE_INT Q_SIZE>
ArrayLockFreeQueue<ELEM_T, Q_SIZE>::~ArrayLockFreeQueue()
{
}
template <typename ELEM_T, QUEUE_INT Q_SIZE>
inline QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(QUEUE_INT a_count)
{
return (a_count % Q_SIZE); // 取余的時(shí)候
}
template <typename ELEM_T, QUEUE_INT Q_SIZE>
QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::size()
{
QUEUE_INT currentWriteIndex = m_writeIndex;
QUEUE_INT currentReadIndex = m_readIndex;
if(currentWriteIndex>=currentReadIndex)
return currentWriteIndex - currentReadIndex;
else
return Q_SIZE + currentWriteIndex - currentReadIndex;
}
template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data)
{
QUEUE_INT currentWriteIndex; // 獲取寫指針的位置
QUEUE_INT currentReadIndex;
// 1. 獲取可寫入的位置
do
{
currentWriteIndex = m_writeIndex;
currentReadIndex = m_readIndex;
if(countToIndex(currentWriteIndex + 1) ==
countToIndex(currentReadIndex))
{
return false; // 隊(duì)列已經(jīng)滿了
}
// 目的是為了獲取一個(gè)能寫入的位置
} while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1)));
// 獲取寫入位置后 currentWriteIndex 是一個(gè)臨時(shí)變量,保存我們寫入的位置
// We know now that this index is reserved for us. Use it to save the data
m_thequeue[countToIndex(currentWriteIndex)] = a_data; // 把數(shù)據(jù)更新到對(duì)應(yīng)的位置
// 2. 更新可讀的位置,按著m_maximumReadIndex+1的操作
// update the maximum read index after saving the data. It wouldn't fail if there is only one thread
// inserting in the queue. It might fail if there are more than 1 producer threads because this
// operation has to be done in the same order as the previous CAS
while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
{
// this is a good place to yield the thread in case there are more
// software threads than hardware processors and you have more
// than 1 producer thread
// have a look at sched_yield (POSIX.1b)
sched_yield(); // 當(dāng)線程超過cpu核數(shù)的時(shí)候如果不讓出cpu導(dǎo)致一直循環(huán)在此。
}
AtomicAdd(&m_count, 1);
return true;
}
template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::try_dequeue(ELEM_T &a_data)
{
return dequeue(a_data);
}
template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data)
{
QUEUE_INT currentMaximumReadIndex;
QUEUE_INT currentReadIndex;
do
{
// to ensure thread-safety when there is more than 1 producer thread
// a second index is defined (m_maximumReadIndex)
currentReadIndex = m_readIndex;
currentMaximumReadIndex = m_maximumReadIndex;
if(countToIndex(currentReadIndex) ==
countToIndex(currentMaximumReadIndex)) // 如果不為空,獲取到讀索引的位置
{
// the queue is empty or
// a producer thread has allocate space in the queue but is
// waiting to commit the data into it
return false;
}
// retrieve the data from the queue
a_data = m_thequeue[countToIndex(currentReadIndex)]; // 從臨時(shí)位置讀取的
// try to perfrom now the CAS operation on the read index. If we succeed
// a_data already contains what m_readIndex pointed to before we
// increased it
if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
{
AtomicSub(&m_count, 1); // 真正讀取到了數(shù)據(jù),元素-1
return true;
}
} while(true);
assert(0);
// Add this return statement to avoid compiler warnings
return false;
}
#endif
源碼測試
#include "ArrayLockFreeQueue.h"
ArrayLockFreeQueue<int> arraylockfreequeue;
void *arraylockfreequeue_producer_thread(void *argv)
{
PRINT_THREAD_INTO();
int count = 0;
int write_failed_count = 0;
for (int i = 0; i < s_queue_item_num;)
{
if (arraylockfreequeue.enqueue(count)) // enqueue的順序是無法保證的,我們只能計(jì)算enqueue的個(gè)數(shù)
{
count = lxx_atomic_add(&s_count_push, 1);
i++;
}
else
{
write_failed_count++;
// printf("%s %lu enqueue failed, q:%d\n", __FUNCTION__, pthread_self(), arraylockfreequeue.size());
sched_yield();
// usleep(10000);
}
}
// printf("%s %lu write_failed_count:%d\n", __FUNCTION__, pthread_self(), write_failed_count)
PRINT_THREAD_LEAVE();
return NULL;
}
void *arraylockfreequeue_consumer_thread(void *argv)
{
int last_value = 0;
PRINT_THREAD_INTO();
int value = 0;
int read_failed_count = 0;
while (true)
{
if (arraylockfreequeue.dequeue(value))
{
if (s_consumer_thread_num == 1 && s_producer_thread_num == 1 && (last_value + 1) != value) // 只有一入一出的情況下才有對(duì)比意義
{
// printf("pid:%lu, -> value:%d, expected:%d\n", pthread_self(), value, last_value);
}
lxx_atomic_add(&s_count_pop, 1);
last_value = value;
}
else
{
read_failed_count++;
// printf("%s %lu no data, s_count_pop:%d, value:%d\n", __FUNCTION__, pthread_self(), s_count_pop, value);
// usleep(100);
sched_yield();
}
if (s_count_pop >= s_queue_item_num * s_producer_thread_num)
{
// printf("%s dequeue:%d, s_count_pop:%d, %d, %d\n", __FUNCTION__, last_value, s_count_pop, s_queue_item_num, s_consumer_thread_num);
break;
}
}
// printf("%s %lu read_failed_count:%d\n", __FUNCTION__, pthread_self(), read_failed_count)
PRINT_THREAD_LEAVE();
return NULL;
}
總結(jié)
基于循環(huán)數(shù)組的無鎖隊(duì)列ArrayLockFreeQueue相實(shí)現(xiàn)對(duì)簡單。
無鎖消息隊(duì)列適用于10w+每秒的數(shù)據(jù)吞吐以及數(shù)據(jù)操作耗時(shí)較少場景。
currentMaximumReadIndex表示其之前的數(shù)據(jù)可以讀取,本身所在的位置不可讀取。
原文鏈接:https://blog.csdn.net/Long_xu/article/details/127026178
相關(guān)推薦
- 2023-07-08 qt報(bào)錯(cuò)***Fault tolerant heap shim applied to current
- 2022-10-11 XGBoost與GBDT和LGBM區(qū)別
- 2023-06-16 Pytorch中的?torch.distributions庫詳解_python
- 2023-07-27 TypeScript類和多態(tài)、抽象類、訪問修飾符
- 2023-07-26 vite項(xiàng)目中處理各種靜態(tài)資源的引入方式介紹
- 2022-04-10 Python?tkinter實(shí)現(xiàn)計(jì)算器功能_python
- 2022-06-21 C語言分別實(shí)現(xiàn)棧和隊(duì)列詳解流程_C 語言
- 2022-06-19 C語言圖文并茂講解分支語句用法_C 語言
- 欄目分類
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支