網站首頁 編程語言 正文
JUC阻塞隊列BlockingQueue---LinkedBlockingQueue
- LinkedBlockingQueue
- 使用
- 原理
- 鏈表結構
- 構造方法
- 內部常量
- 入隊put方法
- 出隊take方法
什么是阻塞隊列?
LinkedBlockingQueue
LinkedBlockingQueue是一個基于鏈表實現的阻塞隊列
,該阻塞隊列的大小默認為Integer.MAX_VALUE
,由于這個數值特別大,所以 LinkedBlockingQueue 也被稱作無界隊列,代表它幾乎沒有界限(隨著元素的添加,隊列的大小會動態增加,如果剩余內存不足,會出現OOM)。為了避免隊列過大造成機器負載或者內存爆滿的情況出現,在使用的時候建議手動傳一個隊列的大小
。
使用
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueExample {
private static final int QUEUE_CAPACITY = 5;
private static final int PRODUCER_DELAY_MS = 1000;
private static final int CONSUMER_DELAY_MS = 2000;
public static void main(String[] args) throws InterruptedException {
// 創建一個容量為QUEUE_CAPACITY的阻塞隊列
BlockingQueue<String> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
// 創建一個無界的阻塞隊列
BlockingQueue<String> queueMax = new LinkedBlockingQueue<>();
// 創建一個生產者線程
new Thread(() -> {
while (true) {
try {
// 在隊列滿時阻塞
queue.put("producer");
System.out.println("生產了一個元素,隊列中元素個數:" + queue.size());
Thread.sleep(PRODUCER_DELAY_MS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 創建一個消費者線程
new Thread(() -> {
while (true) {
try {
// 在隊列為空時阻塞
String element = queue.take();
System.out.println("消費了一個元素,隊列中元素個數:" + queue.size());
Thread.sleep(CONSUMER_DELAY_MS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
原理
LinkedBlockingQueue內部由單鏈表實現,只能從head取元素,從tail添加元素。LinkedBlockingQueue采用兩把鎖的鎖分離技術實現入隊出隊互不阻塞,添加元素和獲取元素都有獨立的鎖,也就是說LinkedBlockingQueue是讀寫分離的,讀寫操作可以并行執行。
鏈表結構
/**
* Linked list node class
*/
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
構造方法
可以看到,當不知道隊列大小時,則默認采用Integer.MAX_VALUE作為隊列的大小。
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 初始化head和last指針為空值節點
last = head = new Node<E>(null);
}
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}, initially containing the elements of the
* given collection,
* added in traversal order of the collection's iterator.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
內部常量
/** The capacity bound, or Integer.MAX_VALUE if none */
// 隊列容量
private final int capacity;
/** Current number of elements */
// 元素數量
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
// 鏈表頭 本身是不存儲任何元素的,初始化時item指向null
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
// 鏈表尾
private transient Node<E> last;
/** Lock held by take, poll, etc */
// take鎖 鎖分離,提高效率
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
// notEmpty條件
// 當隊列無元素時,take鎖會阻塞在notEmpty條件上,等待其它線程喚醒
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
// put鎖
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
// notFull條件
// 當隊列滿了時,put鎖會會阻塞在notFull上,等待其它線程喚醒
private final Condition notFull = putLock.newCondition();
入隊put方法
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
// 入隊元素為空時,拋出空指針異常
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
// 新建一個節點
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 使用put鎖進行加鎖
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
// 如果當前隊列數量和隊列最大值相等,則進行notFull等待(阻塞生產者線程)
while (count.get() == capacity) {
notFull.await();
}
// 隊列不滿時,入隊
enqueue(node);
// 獲取當前隊列長度,同時將當前隊列長度+1
c = count.getAndIncrement();
// 如果現隊列長度小于容量,notFull條件隊列轉同步隊列,準備喚醒一個阻塞在notFull條件上的線程(可以繼續入隊)
// 這里為啥要喚醒一下呢?
// 因為可能有很多線程阻塞在notFull這個條件上,而取元素時只有取之前隊列是滿的才會喚醒notFull,此處不用等到取元素時才喚醒
if (c + 1 < capacity)
notFull.signal();
} finally {
// put鎖解鎖,喚醒其余生產者線程入隊
putLock.unlock();
}
// 如果原隊列長度為0,;立即喚醒阻塞在notEmpty上的線程
if (c == 0)
signalNotEmpty();
}
/**
* Links node at end of queue.
*
* @param node the node
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
// 直接加到last后面,last指向入隊元素
last = last.next = node;
}
/**
* Signals a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
// takeLock加鎖
takeLock.lock();
try {
// notEmpty條件隊列轉同步隊列,準備喚醒阻塞在notEmpty上的線程
notEmpty.signal();
} finally {
// takeLock解鎖,喚醒消費者線程
takeLock.unlock();
}
}
出隊take方法
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// takeLock加鎖
takeLock.lockInterruptibly();
try {
// 隊列空的時候,則進行notEmpty等待(消費者線程阻塞)
while (count.get() == 0) {
notEmpty.await();
}
// 獲取元素,出隊
x = dequeue();
// 獲取原隊列長度,同時原隊列長度-1
c = count.getAndDecrement();
if (c > 1)
// 如果原隊列長度大于1,則喚醒其余消費者線程
notEmpty.signal();
} finally {
// takeLock解鎖
takeLock.unlock();
}
// 為什么隊列是滿的才喚醒阻塞在notFull上的線程呢?
// 因為喚醒是需要加putLock的,這是為了減少鎖的次數,所以,這里索性在放完元素就檢測一下,未滿就喚醒其它notFull上的線程,
// 這也是鎖分離帶來的代價
// 如果取之前隊列長度等于容量(已滿),則喚醒阻塞在notFull的線程
if (c == capacity)
signalNotFull();
return x;
}
/**
* Removes a node from head of queue.
*
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
// head節點本身是不存儲任何元素的
// 這里把head刪除,并把head下一個節點作為新的值
// 并把其值置空,返回原來的值
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
/**
* Signals a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// notFull條件隊列轉同步隊列,準備喚醒阻塞在notFull上的線程
notFull.signal();
} finally {
// putLock解鎖,真正的喚醒生產者線程
putLock.unlock();
}
}
help GC
原文鏈接:https://blog.csdn.net/qq_41139119/article/details/131367263
- 上一篇:沒有了
- 下一篇:沒有了
相關推薦
- 2023-03-05 快速修改運行中的docker容器端口映射的三種方式_docker
- 2022-09-30 Spi機制在Android開發的應用示例詳解_Android
- 2022-10-09 淺談Redis處理接口冪等性的兩種方案_Redis
- 2022-08-21 python深度學習tensorflow安裝調試教程_python
- 2022-06-06 ceph集群RadosGW對象存儲使用詳解_其它綜合
- 2022-06-12 C語言?詳細解析時間復雜度與空間復雜度_C 語言
- 2023-04-27 React中state屬性和生命周期的使用_React
- 2022-05-29 Docker鏡像與容器的導入導出操作實踐_docker
- 欄目分類
-
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支