日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無先后,達(dá)者為師

網(wǎng)站首頁 編程語言 正文

Linux基礎(chǔ)組件之基于循環(huán)數(shù)組的無鎖隊(duì)列

作者:Long_xu 更新時(shí)間: 2022-09-25 編程語言

基于循環(huán)數(shù)組的無鎖隊(duì)列

  • 類接口和變量
  • CAS的使用
  • enqueue入隊(duì)列
  • dequeue出隊(duì)列
  • 在多于一個(gè)生產(chǎn)者線程的情況下“讓出”CPU的必要性
  • 源碼
  • 源碼測試
  • 總結(jié)
  • 后言

類接口和變量

#ifndef _ARRAYLOCKFREEQUEUE_H___
#define _ARRAYLOCKFREEQUEUE_H___

#include <stdint.h>

#ifdef _WIN64
#define QUEUE_INT int64_t
#else
#define QUEUE_INT unsigned long
#endif

#define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65535 // 2^16

template <typename ELEM_T, QUEUE_INT Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE>
class ArrayLockFreeQueue
{
public:

	ArrayLockFreeQueue();
	virtual ~ArrayLockFreeQueue();

	QUEUE_INT size();

	bool enqueue(const ELEM_T &a_data);//入隊(duì)

	bool dequeue(ELEM_T &a_data);// 出隊(duì)

    bool try_dequeue(ELEM_T &a_data);//嘗試入隊(duì)

private:

	ELEM_T m_thequeue[Q_SIZE];

	volatile QUEUE_INT m_count;//隊(duì)列的元素個(gè)數(shù)
	volatile QUEUE_INT m_writeIndex;//新元素入隊(duì)時(shí)存放位置在數(shù)組中的下標(biāo)

	volatile QUEUE_INT m_readIndex;//下一個(gè)出隊(duì)元素在數(shù)組中的下標(biāo)

	volatile QUEUE_INT m_maximumReadIndex;// 最后一個(gè)已經(jīng)完成入隊(duì)操作的元素在數(shù)組中的下標(biāo)

	inline QUEUE_INT countToIndex(QUEUE_INT a_count);
};

#include "ArrayLockFreeQueueImp.h"

#endif

m_maximumReadIndex: 最后一個(gè)已經(jīng)完成入列操作的元素在數(shù)組中的下標(biāo)。如果它的值跟m_writeIndex不一致,表明有寫請(qǐng)求尚未完成。這意味著,有寫請(qǐng)求成功申請(qǐng)了空間但數(shù)據(jù)還沒完全寫進(jìn)隊(duì)列。所以如果有線程要讀取,必須要等到寫線程將數(shù)據(jù)完全寫入到隊(duì)列之后。

必須指明的是使用3種不同的下標(biāo)都是必須的,因?yàn)殛?duì)列允許任意數(shù)量的生產(chǎn)者和消費(fèi)者圍繞著它工作。

數(shù)組環(huán)形圖:
在這里插入圖片描述

CAS的使用

使用gcc內(nèi)置的syn_bool_compare_and_swap,但重新做了宏定義封裝。

#ifndef _ATOM_OPT_H___
#define _ATOM_OPT_H___

#ifdef __GNUC__
	#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)
	#define AtomicAdd(a_ptr,a_count) __sync_fetch_and_add (a_ptr, a_count)
	#define AtomicSub(a_ptr,a_count) __sync_fetch_and_sub (a_ptr, a_count)
	#include <sched.h> // sched_yield()
#else

#include <Windows.h>
#ifdef _WIN64
	#define CAS(a_ptr, a_oldVal, a_newVal) (a_oldVal == InterlockedCompareExchange64(a_ptr, a_newVal, a_oldVal))
	#define sched_yield()	SwitchToThread()
	#define AtomicAdd(a_ptr, num)	InterlockedIncrement64(a_ptr)
	#define AtomicSub(a_ptr, num)	InterlockedDecrement64(a_ptr)
#else
	#define CAS(a_ptr, a_oldVal, a_newVal) (a_oldVal == InterlockedCompareExchange(a_ptr, a_newVal, a_oldVal))
	#define sched_yield()	SwitchToThread()
	#define AtomicAdd(a_ptr, num)	InterlockedIncrement(a_ptr)
	#define AtomicSub(a_ptr, num)	InterlockedDecrement(a_ptr)
#endif

#endif

#endif

enqueue入隊(duì)列

template <typename ELEM_T, QUEUE_INT Q_SIZE>
inline QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(QUEUE_INT a_count)
{
	return (a_count % Q_SIZE);		// 取余的時(shí)候
}

template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data)
{
	QUEUE_INT currentWriteIndex;		// 獲取寫指針的位置
	QUEUE_INT currentReadIndex;
	// 1. 獲取可寫入的位置
	do
	{
		currentWriteIndex = m_writeIndex;
		currentReadIndex = m_readIndex;
		if(countToIndex(currentWriteIndex + 1) ==
			countToIndex(currentReadIndex))
		{
			return false;	// 隊(duì)列已經(jīng)滿了	
		}
		// 目的是為了獲取一個(gè)能寫入的位置
	} while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1)));
	// 獲取寫入位置后 currentWriteIndex 是一個(gè)臨時(shí)變量,保存我們寫入的位置
	// We know now that this index is reserved for us. Use it to save the data
	m_thequeue[countToIndex(currentWriteIndex)] = a_data;  // 把數(shù)據(jù)更新到對(duì)應(yīng)的位置

	// 2. 更新可讀的位置,按著m_maximumReadIndex+1的操作
 	// update the maximum read index after saving the data. It wouldn't fail if there is only one thread 
	// inserting in the queue. It might fail if there are more than 1 producer threads because this
	// operation has to be done in the same order as the previous CAS
	while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
	{
		 // this is a good place to yield the thread in case there are more
		// software threads than hardware processors and you have more
		// than 1 producer thread
		// have a look at sched_yield (POSIX.1b)
		sched_yield();		// 當(dāng)線程超過cpu核數(shù)的時(shí)候如果不讓出cpu導(dǎo)致一直循環(huán)在此。
	}

	AtomicAdd(&m_count, 1);

	return true;

}

分析:
(1)對(duì)于下圖,隊(duì)列中存放了兩個(gè)元素。WriteIndex指示的位置是新元素將會(huì)被插入的位置。ReadIndex指向的位置中的元素將會(huì)在下一次pop操作中被彈出。

(2)當(dāng)生產(chǎn)者準(zhǔn)備將數(shù)據(jù)插入到隊(duì)列中,它首先通過增加WriteIndex的值來申請(qǐng)空間。MaximumReadIndex指向最后一個(gè)存放有效數(shù)據(jù)的位置(也就是實(shí)際的隊(duì)列尾)。

(3)一旦空間的申請(qǐng)完成,生產(chǎn)者就可以將數(shù)據(jù)拷貝到剛剛申請(qǐng)到的位置中。完成之后增加MaximumReadIndex使得它與WriteIndex的一致。

(4)現(xiàn)在隊(duì)列中有3個(gè)元素,接著又有一個(gè)生產(chǎn)者嘗試向隊(duì)列中插入元素。

(5)在第一個(gè)生產(chǎn)者完成數(shù)據(jù)拷貝之前,又有另外一個(gè)生產(chǎn)者申請(qǐng)了一個(gè)新的空間準(zhǔn)備拷貝數(shù)據(jù)。現(xiàn)在有兩個(gè)生產(chǎn)者同時(shí)向隊(duì)列插入數(shù)據(jù)。

(6)現(xiàn)在生產(chǎn)者開始拷貝數(shù)據(jù),在完成拷貝之后,對(duì)MaximumReadIndex的遞增操作必須嚴(yán)格遵循一個(gè)順序:第一個(gè)生產(chǎn)者線程首先遞增MaximumReadIndex,接著才輪到第二個(gè)生產(chǎn)者。 這個(gè)順序必須被嚴(yán)格遵守的原因是,我們必須保證數(shù)據(jù)被完全拷貝到隊(duì)列之后才允許消費(fèi)者線程將其出列
第一個(gè)生產(chǎn)者完成了數(shù)據(jù)拷貝,并對(duì)MaximumReadIndex完成了遞增。

(7)現(xiàn)在第二個(gè)生產(chǎn)者可以遞增MaximumReadIndex了;第二個(gè)生產(chǎn)者完成了對(duì)MaximumReadIndex的遞增,現(xiàn)在隊(duì)列中有5個(gè)元素。

dequeue出隊(duì)列

template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data)
{
	QUEUE_INT currentMaximumReadIndex;
	QUEUE_INT currentReadIndex;

	do
	{
		 // to ensure thread-safety when there is more than 1 producer thread
       	// a second index is defined (m_maximumReadIndex)
		currentReadIndex = m_readIndex;
		currentMaximumReadIndex = m_maximumReadIndex;

		if(countToIndex(currentReadIndex) ==
			countToIndex(currentMaximumReadIndex))		// 如果不為空,獲取到讀索引的位置
		{
			// the queue is empty or
			// a producer thread has allocate space in the queue but is 
			// waiting to commit the data into it
			return false;
		}
		// retrieve the data from the queue
		a_data = m_thequeue[countToIndex(currentReadIndex)]; // 從臨時(shí)位置讀取的

		// try to perfrom now the CAS operation on the read index. If we succeed
		// a_data already contains what m_readIndex pointed to before we 
		// increased it
		if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
		{
			AtomicSub(&m_count, 1);	// 真正讀取到了數(shù)據(jù),元素-1
			return true;
		}
	} while(true);

	assert(0);
	 // Add this return statement to avoid compiler warnings
	return false;

}

分析:
(1)以下插圖展示了元素出列的時(shí)候各種下標(biāo)是如何變化的,隊(duì)列中初始有2個(gè)元素。WriteIndex指示的位置是新元素將會(huì)被插入的位置。ReadIndex指向的位置中的元素將會(huì)在下一次pop操作中被彈出。

(2)消費(fèi)者線程拷貝數(shù)組ReadIndex位置的元素,然后嘗試用CAS操作將ReadIndex加1。如果操作成功消費(fèi)者成功的將數(shù)據(jù)出列。因?yàn)镃AS操作是原子的,所以只有唯一的線程可以在同一時(shí)刻更新ReadIndex的值。如果操作失敗,讀取新的ReadIndex值,以重復(fù)以上操作(copy數(shù)據(jù),CAS)。

(3)現(xiàn)在又有一個(gè)消費(fèi)者將元素出列,隊(duì)列變成空。

(4)現(xiàn)在有一個(gè)生產(chǎn)者正在向隊(duì)列中添加元素。它已經(jīng)成功的申請(qǐng)了空間,但尚未完成數(shù)據(jù)拷貝。任何其它企圖從隊(duì)列中移除元素的消費(fèi)者都會(huì)發(fā)現(xiàn)隊(duì)列非空(因?yàn)閣riteIndex不等于readIndex)。但它不能讀取readIndex所指向位置中的數(shù)據(jù),因?yàn)閞eadIndex與MaximumReadIndex相等。消費(fèi)者將會(huì)在do循環(huán)中不斷的反復(fù)嘗試,直到生產(chǎn)者完成數(shù)據(jù)拷貝增加MaximumReadIndex的值,或者隊(duì)列變成空(這在多個(gè)消費(fèi)者的場景下會(huì)發(fā)生)。

(5)當(dāng)生產(chǎn)者完成數(shù)據(jù)拷貝,隊(duì)列的大小是1,消費(fèi)者線程可以讀取這個(gè)數(shù)據(jù)了。

在多于一個(gè)生產(chǎn)者線程的情況下“讓出”CPU的必要性

enqueue函數(shù)中使用了sched_yiedld()來主動(dòng)的讓出CPU,對(duì)于一個(gè)無鎖的算法而言,這個(gè)調(diào)用看起來有點(diǎn)奇怪。

多線程環(huán)境下影響性能的其中一個(gè)因素就是Cache損壞。而產(chǎn)生Cache損壞的一種情況就是一個(gè)線程被搶占,操作系統(tǒng)需要保存被搶占線程的上下文,然后將被選中作為下一個(gè)調(diào)度線程的上下文載入。此時(shí)Cache中緩存的數(shù)據(jù)都會(huì)失效,因?yàn)樗潜粨屨季€程的數(shù)據(jù)而不是新線程的數(shù)據(jù)。

所以,當(dāng)此算法調(diào)用sched_yield()意味著告訴操作系統(tǒng):“我要把處理器時(shí)間讓給其它線程,因?yàn)槲乙却臣虑榈陌l(fā)生”。無鎖算法和通過阻塞機(jī)制同步的算法的一個(gè)主要區(qū)別在于無鎖算法不會(huì)阻塞在線程同步上。
那么為什么在這里我們要主動(dòng)請(qǐng)求操作系統(tǒng)搶占自己呢? 它與有多少個(gè)生產(chǎn)者線程在并發(fā)的往隊(duì)列中存放數(shù)據(jù)有關(guān):每個(gè)生產(chǎn)者線程所執(zhí)行的CAS操作都必須嚴(yán)格遵循FIFO次序,一個(gè)用于申請(qǐng)空間,另一個(gè)用于通知消費(fèi)者數(shù)據(jù)已經(jīng)寫入完成可以被讀取了。
如果應(yīng)用程序只有唯一的生產(chǎn)者操作這個(gè)隊(duì)列,sche_yield()將永遠(yuǎn)沒有機(jī)會(huì)被調(diào)用,第2個(gè)CAS操作永遠(yuǎn)不會(huì)失敗。因?yàn)樵谝粋€(gè)生產(chǎn)者的情況下沒有人能破壞生產(chǎn)者執(zhí)行這兩個(gè)CAS操作的FIFO順序。
而當(dāng)多于一個(gè)生產(chǎn)者線程往隊(duì)列中存放數(shù)據(jù)的時(shí)候,問題就出現(xiàn)了。概括來說,一個(gè)生產(chǎn)者通過第1個(gè)
CAS操作申請(qǐng)空間,然后將數(shù)據(jù)寫入到申請(qǐng)到的空間中,然后執(zhí)行第2個(gè)CAS操作通知消費(fèi)者數(shù)據(jù)準(zhǔn)備完畢可供讀取了。這第2個(gè)CAS操作必須遵循FIFO順序,也就是說,如果A線程第首先執(zhí)行完第一個(gè)CAS操作,那么它也要第1個(gè)執(zhí)行完第2個(gè)CAS操作,如果A線程在執(zhí)行完第一個(gè)CAS操作之后停止,然后B線程執(zhí)行完第1個(gè)CAS操作,那么B線程將無法完成第2個(gè)CAS操作,因?yàn)樗却鼳先完成第2個(gè)CAS操作。而這就是問題產(chǎn)生的根源。

源碼

#ifndef _ARRAYLOCKFREEQUEUEIMP_H___
#define _ARRAYLOCKFREEQUEUEIMP_H___
#include "ArrayLockFreeQueue.h"

#include <assert.h>
#include "atom_opt.h"

template <typename ELEM_T, QUEUE_INT Q_SIZE>
ArrayLockFreeQueue<ELEM_T, Q_SIZE>::ArrayLockFreeQueue() :
	m_writeIndex(0),
	m_readIndex(0),
	m_maximumReadIndex(0)
{
	m_count = 0;
}

template <typename ELEM_T, QUEUE_INT Q_SIZE>
ArrayLockFreeQueue<ELEM_T, Q_SIZE>::~ArrayLockFreeQueue()
{

}

template <typename ELEM_T, QUEUE_INT Q_SIZE>
inline QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(QUEUE_INT a_count)
{
	return (a_count % Q_SIZE);		// 取余的時(shí)候
}

template <typename ELEM_T, QUEUE_INT Q_SIZE>
QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::size()
{
	QUEUE_INT currentWriteIndex = m_writeIndex;
	QUEUE_INT currentReadIndex = m_readIndex;

	if(currentWriteIndex>=currentReadIndex)
		return currentWriteIndex - currentReadIndex;
	else
		return Q_SIZE + currentWriteIndex - currentReadIndex;

}

template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data)
{
	QUEUE_INT currentWriteIndex;		// 獲取寫指針的位置
	QUEUE_INT currentReadIndex;
	// 1. 獲取可寫入的位置
	do
	{
		currentWriteIndex = m_writeIndex;
		currentReadIndex = m_readIndex;
		if(countToIndex(currentWriteIndex + 1) ==
			countToIndex(currentReadIndex))
		{
			return false;	// 隊(duì)列已經(jīng)滿了	
		}
		// 目的是為了獲取一個(gè)能寫入的位置
	} while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1)));
	// 獲取寫入位置后 currentWriteIndex 是一個(gè)臨時(shí)變量,保存我們寫入的位置
	// We know now that this index is reserved for us. Use it to save the data
	m_thequeue[countToIndex(currentWriteIndex)] = a_data;  // 把數(shù)據(jù)更新到對(duì)應(yīng)的位置

	// 2. 更新可讀的位置,按著m_maximumReadIndex+1的操作
 	// update the maximum read index after saving the data. It wouldn't fail if there is only one thread 
	// inserting in the queue. It might fail if there are more than 1 producer threads because this
	// operation has to be done in the same order as the previous CAS
	while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
	{
		 // this is a good place to yield the thread in case there are more
		// software threads than hardware processors and you have more
		// than 1 producer thread
		// have a look at sched_yield (POSIX.1b)
		sched_yield();		// 當(dāng)線程超過cpu核數(shù)的時(shí)候如果不讓出cpu導(dǎo)致一直循環(huán)在此。
	}

	AtomicAdd(&m_count, 1);

	return true;

}

template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::try_dequeue(ELEM_T &a_data)
{
    return dequeue(a_data);
}

template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data)
{
	QUEUE_INT currentMaximumReadIndex;
	QUEUE_INT currentReadIndex;

	do
	{
		 // to ensure thread-safety when there is more than 1 producer thread
       	// a second index is defined (m_maximumReadIndex)
		currentReadIndex = m_readIndex;
		currentMaximumReadIndex = m_maximumReadIndex;

		if(countToIndex(currentReadIndex) ==
			countToIndex(currentMaximumReadIndex))		// 如果不為空,獲取到讀索引的位置
		{
			// the queue is empty or
			// a producer thread has allocate space in the queue but is 
			// waiting to commit the data into it
			return false;
		}
		// retrieve the data from the queue
		a_data = m_thequeue[countToIndex(currentReadIndex)]; // 從臨時(shí)位置讀取的

		// try to perfrom now the CAS operation on the read index. If we succeed
		// a_data already contains what m_readIndex pointed to before we 
		// increased it
		if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
		{
			AtomicSub(&m_count, 1);	// 真正讀取到了數(shù)據(jù),元素-1
			return true;
		}
	} while(true);

	assert(0);
	 // Add this return statement to avoid compiler warnings
	return false;

}

#endif

源碼測試

#include "ArrayLockFreeQueue.h"
ArrayLockFreeQueue<int> arraylockfreequeue;
void *arraylockfreequeue_producer_thread(void *argv)
{
  PRINT_THREAD_INTO();
  int count = 0;
  int write_failed_count = 0;

  for (int i = 0; i < s_queue_item_num;)
  {
    if (arraylockfreequeue.enqueue(count)) // enqueue的順序是無法保證的,我們只能計(jì)算enqueue的個(gè)數(shù)
    {
      count = lxx_atomic_add(&s_count_push, 1);
      i++;
    }
    else
    {
      write_failed_count++;
      // printf("%s %lu enqueue failed, q:%d\n", __FUNCTION__,  pthread_self(), arraylockfreequeue.size());
      sched_yield();
      // usleep(10000);
    }
  }
  // printf("%s %lu write_failed_count:%d\n", __FUNCTION__, pthread_self(), write_failed_count)
  PRINT_THREAD_LEAVE();
  return NULL;
}

void *arraylockfreequeue_consumer_thread(void *argv)
{
  int last_value = 0;
  PRINT_THREAD_INTO();
  int value = 0;
  int read_failed_count = 0;
  while (true)
  {

    if (arraylockfreequeue.dequeue(value))
    {
      if (s_consumer_thread_num == 1 && s_producer_thread_num == 1 && (last_value + 1) != value) // 只有一入一出的情況下才有對(duì)比意義
      {
        // printf("pid:%lu, -> value:%d, expected:%d\n", pthread_self(), value, last_value);
      }
      lxx_atomic_add(&s_count_pop, 1);
      last_value = value;
    }
    else
    {
      read_failed_count++;
      // printf("%s %lu no data, s_count_pop:%d, value:%d\n", __FUNCTION__, pthread_self(), s_count_pop, value);
      // usleep(100);
      sched_yield();
    }

    if (s_count_pop >= s_queue_item_num * s_producer_thread_num)
    {
      // printf("%s dequeue:%d, s_count_pop:%d, %d, %d\n", __FUNCTION__, last_value, s_count_pop, s_queue_item_num, s_consumer_thread_num);
      break;
    }
  }
  // printf("%s %lu read_failed_count:%d\n", __FUNCTION__, pthread_self(), read_failed_count)
  PRINT_THREAD_LEAVE();
  return NULL;
}

總結(jié)

基于循環(huán)數(shù)組的無鎖隊(duì)列ArrayLockFreeQueue相實(shí)現(xiàn)對(duì)簡單。
無鎖消息隊(duì)列適用于10w+每秒的數(shù)據(jù)吞吐以及數(shù)據(jù)操作耗時(shí)較少場景。
currentMaximumReadIndex表示其之前的數(shù)據(jù)可以讀取,本身所在的位置不可讀取。

原文鏈接:https://blog.csdn.net/Long_xu/article/details/127026178

欄目分類
最近更新