網站首頁 編程語言 正文
BlockingQueue原理以及實現
- 一,BlockingQueue
- 1,主要常用的隊列有如下
- 2,基本工作原理實現如下
- 3,基本api使用如下
- 二,源碼剖析
- 2.1,ArrayBlockingQueue
- 三,總結
一,BlockingQueue
在最常見的使用到這個阻塞隊列的地方,就是我們耳熟能詳的線程池里面了,作為我們線程池的一大最大參與者,也是AQS的一個具體實現,因此可以好好的深入了解一下這個BlockingQueue阻塞隊列。
用一句話描述這個阻塞隊列就是:它是線程的一個通信工具,在任意時刻,不管并發有多高,在單jvm進程上,同一時間永遠只有一個線程能夠對隊列進行入隊和出隊的操作,它的特性是在任意時刻只有一個線程可以進行take或者put操作。因此這個隊列是一個線程安全的隊列。
比較適用于生產者和消費者的場景,因此適用的應用場景如下
線程池,springCloud-Eureka的三級緩存,Nacos,Netty,RakectMq等
所有的阻塞隊列都都實現了對這個BlockingQueue接口
public interface BlockingQueue<E> extends Queue<E>
1,主要常用的隊列有如下
ArrayBlockingQueue: 由數組支持的有界隊列
LinkedBlockingQueue: 由鏈接節點支持的可選有界隊列
PriorityBlockingQueue: 由優先級堆支持的無界優先級隊列
DelayQueue: 由優先級堆支持的、基于時間的調度隊列
2,基本工作原理實現如下
1,以一個有界隊列為例,首先消費者這邊獲取到鎖,然后會生產商品,然后會往隊列中填滿數據,隊列填滿之后,生產者端會進行阻塞,同時會釋放這把鎖,并且會通知這個消費者趕緊去消費。當然內部也做了很多事情,不一定就是說一定要阻塞隊列滿了之后才會去喚醒生產者去消費,而是消費者那邊也會有一個監聽事件,只有隊列不為空,就會有這個消費者來消費。
2,消費者在接收到生產者的通知之后呢,就會先去獲取到這把鎖,然后對里面的產品進行消費,當隊列里面的產品都被消費完成之后,消費者這邊又會釋放這把鎖,然后將自身阻塞,并同時去喚醒這個生產者繼續生產產品。
3,生產者又獲取到鎖,然后重復執行第一步。
3,基本api使用如下
二,源碼剖析
在了解過一定的工作原理之后,接下來可以對源碼分析一波。
2.1,ArrayBlockingQueue
這里主要通過這個ArrayBlockingQueue為例,來描述一下這個阻塞隊列的工作流程
BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(BOUND);
這個構造方法里面有如下參數
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair); //非公平鎖
notEmpty = lock.newCondition(); //條件對象,用于喚醒指定線程
notFull = lock.newCondition(); //條件對象
}
生產者會向隊列中put產品,生產者后會持有鎖,此時會向隊列中存放產品,如果隊列滿了,則會阻塞自己,并且在最后會釋放鎖。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock; //生產者加鎖
lock.lockInterruptibly();
try {
while (count == items.length) //如果隊列滿了,則會阻塞
notFull.await();
enqueue(e);
} finally {
lock.unlock(); //釋放鎖
}
}
既然涉及到ReentrantLock,那么就用從之前的AQS里面講起了,這里面這要是一個CLH同步等待隊列,由一個雙向鏈表和一個同步阻塞器組成,同步阻塞器會有一個state和一個exclusiveOwnerThread狀態組成,state=0表示當前沒有對象獲取到鎖,可以來競爭鎖。每個結點由一個前驅指針和一個后繼指針,并且里面有一個waitStatus等待狀態,該狀態主要表示下一個結點的存活狀態。
這里的話不會像之前一樣使用這個CLH同步等待隊列,而是加入了一種新的Condition條件等待隊列,如下圖。由firstWaiter和nextWaiter組成的單向鏈表隊列,里面的waitStatus為CONDITION:-2 。也就是說如果當前生產者結點后面的結點又是一個生產者節點,因為期間可能存在多個生產者的線程,而為了喚醒接下來的消費者,就會創建一個條件等待隊列,去存儲后面的生產者結點。
就是說在CLH同步等待隊列中,當前結點為生產者的話,在阻塞隊列滿了之后,如果CLH中的下一個節點還是生產者,則會將waitStatus的狀態設置成-2,并將下一個節點移動到這個條件等待隊列里面并進行排隊,如果下一個結點還是,又會將下一個結點移動到這個條件等待隊列里面并進行排隊。知道下一個結點是消費者為止。
await()釋放鎖的流程如下
public final void await() throws InterruptedException {
//線程是否被中斷,如果被中斷,直接拋異常
if (Thread.interrupted())
throw new InterruptedException();
//條件等待隊列,會構建一個新的隊列
Node node = addConditionWaiter();
//釋放鎖,并對對應的結點進行喚醒操作
int savedState = fullyRelease(node);
int interruptMode = 0;
//判斷當前結點是在條件隊列里面還是在同步隊列里面
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
構建條件等待隊列如下
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
出隊,消費者在獲取產品時,產品就會出隊,與此同時,在隊列出隊成功之后,隊列中就會有一個空位,會調用notFull.signal()方法,通知生產者可以去生產產品了。并將這個條件等待隊列放回這個CLH隊列里面,只有在CLH隊列里面才會獲取鎖。最后在CLH中才能進行unPark釋放鎖的操作。
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//隊列中有空位,通知生產者生產產品
notFull.signal();
return x;
}
消費者獲取產品
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
三,總結
BlockingQueue也是基于這個AQS的方式實現的,主要是利用這個生產者和消費者這個模型來實現。通過這個AQS中的CLH同步隊列來對節點的鎖的阻塞和釋放,期間利用了這個條件等待隊列來實現,如果存在多個生產者的線程的情況下,就會將這些線程加入到一個條件等待的隊列里面。并將這個節點的狀態改為-2,condition狀態。在全部進入條件等待隊列之后,這個鎖還在并沒有釋放,因此最后又需要將這個條件等待隊列里面的結點加回到CLH同步隊列中,再進行排隊的釋放這個鎖。結點出隊的時候,然后生產者會通過一個singal監聽這個消費者,每當這個阻塞隊列里面出隊,有一個位置的的時候,生產者就會生產這個產品。消費者也會監聽這個隊列,隊列中只要不為空,就回去消費隊列中的產品。
獲取鎖的條件
只有在CLH隊列里等待的Node結點并且前驅結點的 waitStatus 為sinal = -1的可被喚醒的結點。
條件隊列里面的這些節點是不能獲取到鎖的。
原文鏈接:https://blog.csdn.net/zhenghuishengq/article/details/125710294
- 上一篇:Linux創建定時執行任務
- 下一篇:aqs原理初探以及公平鎖和非公平鎖實現
相關推薦
- 2022-04-18 python列表推導式實現找出列表中長度大于5的名字_python
- 2023-01-30 delphi?判斷字符串是否包含漢字,正則版與非正則版實現_Delphi
- 2022-06-02 ubuntu安裝jupyter并設置遠程訪問的實現_python
- 2022-10-06 python中關于對super()函數疑問解惑_python
- 2021-11-03 Linux系統下grub.cfg文件損壞修復步驟_Linux
- 2022-10-18 Qt實現柵格布局效果_C 語言
- 2022-03-19 Nginx純配置實現日志實時上報的思路與方法_nginx
- 2024-04-06 MyBatis的一級(同SqlSession會話),二級(不同SqlSession會話)緩存使用
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支