日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

JUC阻塞隊列BlockingQueue---LinkedBlockingQueue

作者:稻草人ZZ 更新時間: 2023-07-04 編程語言

JUC阻塞隊列BlockingQueue---LinkedBlockingQueue

  • LinkedBlockingQueue
  • 使用
  • 原理
    • 鏈表結構
    • 構造方法
    • 內部常量
    • 入隊put方法
    • 出隊take方法

什么是阻塞隊列?

LinkedBlockingQueue

LinkedBlockingQueue是一個基于鏈表實現的阻塞隊列,該阻塞隊列的大小默認為Integer.MAX_VALUE,由于這個數值特別大,所以 LinkedBlockingQueue 也被稱作無界隊列,代表它幾乎沒有界限(隨著元素的添加,隊列的大小會動態增加,如果剩余內存不足,會出現OOM)。為了避免隊列過大造成機器負載或者內存爆滿的情況出現,在使用的時候建議手動傳一個隊列的大小

使用

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueueExample {

    private static final int QUEUE_CAPACITY = 5;
    private static final int PRODUCER_DELAY_MS = 1000;
    private static final int CONSUMER_DELAY_MS = 2000;

    public static void main(String[] args) throws InterruptedException {
        // 創建一個容量為QUEUE_CAPACITY的阻塞隊列
        BlockingQueue<String> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
        // 創建一個無界的阻塞隊列
        BlockingQueue<String> queueMax = new LinkedBlockingQueue<>();

        // 創建一個生產者線程
        new Thread(() -> {
            while (true) {
                try {
                    // 在隊列滿時阻塞
                    queue.put("producer");
                    System.out.println("生產了一個元素,隊列中元素個數:" + queue.size());
                    Thread.sleep(PRODUCER_DELAY_MS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 創建一個消費者線程
        new Thread(() -> {
            while (true) {
                try {
                    // 在隊列為空時阻塞
                    String element = queue.take();
                    System.out.println("消費了一個元素,隊列中元素個數:" + queue.size());
                    Thread.sleep(CONSUMER_DELAY_MS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

原理

LinkedBlockingQueue內部由單鏈表實現,只能從head取元素,從tail添加元素。LinkedBlockingQueue采用兩把鎖的鎖分離技術實現入隊出隊互不阻塞,添加元素和獲取元素都有獨立的鎖,也就是說LinkedBlockingQueue是讀寫分離的,讀寫操作可以并行執行。

鏈表結構

    /**
     * Linked list node class
     */
    static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

構造方法

可以看到,當不知道隊列大小時,則默認采用Integer.MAX_VALUE作為隊列的大小。

    /**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    /**
     * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
     *
     * @param capacity the capacity of this queue
     * @throws IllegalArgumentException if {@code capacity} is not greater
     *         than zero
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        // 初始化head和last指針為空值節點
        last = head = new Node<E>(null);
    }

    /**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     * {@link Integer#MAX_VALUE}, initially containing the elements of the
     * given collection,
     * added in traversal order of the collection's iterator.
     *
     * @param c the collection of elements to initially contain
     * @throws NullPointerException if the specified collection or any
     *         of its elements are null
     */
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

內部常量

    /** The capacity bound, or Integer.MAX_VALUE if none */
    // 隊列容量
    private final int capacity;

    /** Current number of elements */
    // 元素數量
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
     // 鏈表頭  本身是不存儲任何元素的,初始化時item指向null
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
     // 鏈表尾
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    // take鎖   鎖分離,提高效率
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    // notEmpty條件
	// 當隊列無元素時,take鎖會阻塞在notEmpty條件上,等待其它線程喚醒
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    // put鎖
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    // notFull條件
	// 當隊列滿了時,put鎖會會阻塞在notFull上,等待其它線程喚醒
    private final Condition notFull = putLock.newCondition();

入隊put方法

    /**
     * Inserts the specified element at the tail of this queue, waiting if
     * necessary for space to become available.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
    	// 入隊元素為空時,拋出空指針異常
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        // 新建一個節點
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // 使用put鎖進行加鎖
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            // 如果當前隊列數量和隊列最大值相等,則進行notFull等待(阻塞生產者線程)
            while (count.get() == capacity) {
                notFull.await();
            }
            // 隊列不滿時,入隊
            enqueue(node);
            // 獲取當前隊列長度,同時將當前隊列長度+1
            c = count.getAndIncrement();
            
            // 如果現隊列長度小于容量,notFull條件隊列轉同步隊列,準備喚醒一個阻塞在notFull條件上的線程(可以繼續入隊) 
        	// 這里為啥要喚醒一下呢?
        	// 因為可能有很多線程阻塞在notFull這個條件上,而取元素時只有取之前隊列是滿的才會喚醒notFull,此處不用等到取元素時才喚醒 
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
        	// put鎖解鎖,喚醒其余生產者線程入隊
            putLock.unlock();
        }
        // 如果原隊列長度為0,;立即喚醒阻塞在notEmpty上的線程
        if (c == 0)
            signalNotEmpty();
    }

    /**
     * Links node at end of queue.
     *
     * @param node the node
     */
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        // 直接加到last后面,last指向入隊元素
        last = last.next = node;
    }

    /**
     * Signals a waiting take. Called only from put/offer (which do not
     * otherwise ordinarily lock takeLock.)
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        // takeLock加鎖
        takeLock.lock();
        try {
        	// notEmpty條件隊列轉同步隊列,準備喚醒阻塞在notEmpty上的線程
            notEmpty.signal();
        } finally {
			// takeLock解鎖,喚醒消費者線程
            takeLock.unlock();
        }
    }

出隊take方法

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        // takeLock加鎖
        takeLock.lockInterruptibly();
        try {
        	// 隊列空的時候,則進行notEmpty等待(消費者線程阻塞)
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 獲取元素,出隊
            x = dequeue();
            // 獲取原隊列長度,同時原隊列長度-1
            c = count.getAndDecrement();
            if (c > 1)
            	// 如果原隊列長度大于1,則喚醒其余消費者線程
                notEmpty.signal();
        } finally {
        	// takeLock解鎖
            takeLock.unlock();
        }
        // 為什么隊列是滿的才喚醒阻塞在notFull上的線程呢?
    	// 因為喚醒是需要加putLock的,這是為了減少鎖的次數,所以,這里索性在放完元素就檢測一下,未滿就喚醒其它notFull上的線程,
    	// 這也是鎖分離帶來的代價
    	// 如果取之前隊列長度等于容量(已滿),則喚醒阻塞在notFull的線程
        if (c == capacity)
            signalNotFull();
        return x;
    }

    /**
     * Removes a node from head of queue.
     *
     * @return the node
     */
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
         // head節點本身是不存儲任何元素的
    	// 這里把head刪除,并把head下一個節點作為新的值
    	// 并把其值置空,返回原來的值
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

    /**
     * Signals a waiting put. Called only from take/poll.
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
	        // notFull條件隊列轉同步隊列,準備喚醒阻塞在notFull上的線程
            notFull.signal();
        } finally {
        	// putLock解鎖,真正的喚醒生產者線程
            putLock.unlock();
        }
    }

help GC

原文鏈接:https://blog.csdn.net/qq_41139119/article/details/131367263

  • 上一篇:沒有了
  • 下一篇:沒有了
欄目分類
最近更新