首頁 > 軟體

Java阻塞佇列BlockingQueue詳解

2022-07-29 22:05:38

佇列的型別

  • 無限佇列(unbounded queue) 無容量限定,只隨儲存變化
  • 有限佇列(bounded queue) 定義了最大容量

向無限佇列新增元素的所有操作都將永遠不會阻塞(也是執行緒安全的),因此它可以增長到非常大的容量。 使用無限阻塞佇列 BlockingQueue 設計生產者 - 消費者模型時最重要的是消費者應該能夠像生產者向佇列新增訊息一樣快地消費訊息 。否則可能記憶體不足而丟擲 OutOfMemory 異常。

資料結構

  • 1.通常使用連結串列或陣列實現
  • 2.一般具有 FIFO(先進先出) 特性,也可以設計為雙端佇列
  • 3.佇列的主要操作:入隊和出隊

阻塞佇列 BlockingQueue

定義:執行緒通訊中,在任意時刻,無論並行有多高,在單個 JVM 上,同一時間永遠只有一個執行緒能對佇列進行入隊或出隊操作。BlockingQueue 可以線上程之間共用而無需任何顯式同步

阻塞佇列的型別:  

JAVA中的應用場景 : 執行緒池、SpringCloud-Eureka 三級快取、Nacos、MQ、Netty 等

常見的阻塞佇列

  • ArrayBlockingQueue : 由陣列支援的有界佇列
    • 應用場景: 執行緒池中有比較多的應用、生產者消費者模型
    • 工作原理: 基於 ReentrantLock 保證執行緒安全,根據Condition實現佇列滿時的阻塞
  • LinkedBlockingQueue : 基於連結串列的無界佇列(理論上有界)
  • PriorityBlockingQueue : 由優先順序堆支援的無界優先順序佇列
  • DelayQueue : 由優先順序堆支援的、基於時間的排程佇列,內部基於無界佇列PriorityQueue 實現,而無界佇列基於陣列的擴容實現
    • 使用方法: 入隊的物件必須要實現 Delayed 介面,而 Delayed 整合自 Comparable 介面
    • 應用場景: 售賣電影票等
    • 工作原理: 佇列內部會根據時間優先順序進行排序。延遲類執行緒池週期執行。

它們都實現了BlockingQueue介面,都有put()和take()等方法,建立方式如下:

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<> (666);

BlockingQueue API

新增元素:

方法含義
add()如果插入成功則返回 true,否則丟擲 IllegalStateException 異常
put()將指定的元素插入佇列,如果佇列滿了,會阻塞直到有空間插入
offer()如果插入成功則返回 true,否則返回 false
offer(E e, long timeout, TimeUnit unit)嘗試將元素插入佇列,如果佇列已滿,會阻塞直到有空間插入,阻塞有時間控制

檢索元素:

方法含義
take()獲取佇列的頭部元素並將其刪除,如果佇列為空,則阻塞並等待元素變為可用
poll(long timeout, TimeUnit unit)檢索並刪除佇列的頭部,如有必要,等待指定的等待時間以使元素可用,如果超時,則返回 null

ArrayBlockingQueue 原始碼簡解

實現:同步等待佇列(CLH)+ 條件等待佇列滿足條件的元素在CLH佇列中等待鎖,不滿足條件的佇列挪到條件等待佇列,滿足條件後再從 tail 插入 CLH 佇列

執行緒獲取鎖的條件: 在 CLH 佇列裡等待的 Node 節點,並且 Node 節點的前驅節點是 Singal。條件等待佇列裡的執行緒是無法獲取鎖的。

/**
 * 構造方法
 * 還有兩個建構函式,一個無fair引數,一個可傳入集合,建立時插入佇列
 * @param capacity 固定容量
 * @param fair 預設是false:存取順序未指定; true:按照FIFO順序處理
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
   if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair); // 根據fair建立對應的鎖
    // 條件物件,配合容器能滿足業務
    notEmpty = lock.newCondition(); // 出隊條件物件
    notFull =  lock.newCondition(); // 入隊條件物件
}
/**
 * 入隊方法
 * 在佇列的尾部插入指定的元素,如果佇列已滿,則等待空間可用
 */
public void put(E e) throws InterruptedException {
    checkNotNull(e); // 檢查put物件是否為空,空丟擲異常
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 若未被中斷嘗試獲取鎖,詳見下文
    try {
        // 佇列中元素的數量 等於 排隊元素的長度
        while (count == items.length)
            notFull.await(); // 見下文
        enqueue(e); // 元素入隊
    } finally {
        lock.unlock();
    }
}
/**
 * 出隊方法
 * 獲取佇列的頭部元素並將其刪除,如果佇列為空,則阻塞並等待元素變為可用
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 見下文
    try {
        while (count == 0)
            notEmpty.await(); // 見下文
        return dequeue(); // 元素出隊
    } finally {
        lock.unlock();
    }
}

令當前執行緒等待,直到收到訊號或被中斷詳:與此 Condition 關聯的鎖被自動釋放,進入等待,並且處於休眠狀態,直到發生以下四種情況之一:

  • ①其他執行緒呼叫這個Condition的 signal 方法,當前執行緒恰好被選為要被喚醒的執行緒;
  • ②其他執行緒呼叫這個條件的 signalAll 方法
  • ③其他執行緒中斷當前執行緒,支援中斷執行緒掛起;
  • ④一個“虛假的喚醒”發生了。

在這些情況下,在此方法返回之前,當前執行緒必須重新獲得與此條件相關聯的鎖。當執行緒返回時,保證它持有這個鎖。

如果當前執行緒有以下兩種情況之一:

  • ①在進入該方法時設定中斷狀態;
  • ②在等待時被中斷,支援執行緒掛起的中斷 丟擲InterruptedException

生產者消費者模式

BlockingQueue 可以線上程之間共用而無需任何顯式同步,在生產者消費者之間,只需要將阻塞佇列以引數的形式進行傳遞即可。它內部的機制會自動保證執行緒的安全性。

生產者:實現了 Runnable 介面,每個生產者生產100種商品和1箇中斷標記後完成執行緒任務

@Slf4j
@Slf4j
public class Producer implements Runnable{
    // 作為引數的阻塞佇列
    private BlockingQueue<Integer> blockingQueue;
    private final int stopTag;
    /**
     * 構造方法
     * @param blockingQueue
     * @param stopTag
     */
    public Producer(BlockingQueue<Integer> blockingQueue,int stopTag) {
        this.blockingQueue = blockingQueue;
        this.stopTag = stopTag;
    }
    @Override
    public void run() {
        try {
            generateNumbers();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
   private void generateNumbers() throws InterruptedException {
        // 每個生產者都隨機生產10種商品
        for (int i = 0; i < 10; i++) {
            int product = ThreadLocalRandom.current().nextInt(1000,1100);
            log.info("生產者{}號,生產了商品,編號為{}",Thread.currentThread().getId(),product);
            blockingQueue.put(product);
        }
        // 生產終止標記
        blockingQueue.put(stopTag);
        log.info("生產者{}號,生產了第終止標記編號{}",Thread.currentThread().getId(),Thread.currentThread().getId());
    }
}

消費者:消費者拿到終止消費標記終止消費,否則消費商品,拿到終止標記後完成執行緒任務

@Slf4j
public class Consumer implements Runnable{
    // 作為引數的阻塞佇列
    private BlockingQueue<Integer> queue;
    private final int stopTage;
    public Consumer(BlockingQueue<Integer> queue, int stopTage) {
        this.queue = queue;
        this.stopTage = stopTage;
    }
    @Override
    public void run() {
        try {
            while (true) {
                Integer product = queue.take();
                if (product.equals(stopTage)) {
                    log.info("{}號消費者,停止消費,因為拿到了停止消費標記",Thread.currentThread().getId());
                    return;
                }
                log.info("{}號消費者,拿到的商品編號:{}",Thread.currentThread().getId(),product);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

使用者端類: 建立與計算機 CPU 核數相同的執行緒數,與 16個生產者

public class ProductConsumerTest {
    public static void main(String[] args) {
        // 阻塞佇列容量
        int blockingQueueSize = 10;
        // 生產者數量
        int producerSize = 16;
        // 消費者數量 = 計算機執行緒核數 8
        int consumerSize = Runtime.getRuntime().availableProcessors();
        // 終止消費標記
        int stopTag = Integer.MAX_VALUE;
        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(blockingQueueSize);
        // 建立16個生產者執行緒
        for (int i = 0; i < producerSize; i++) {
            new Thread(new Producer(blockingQueue, stopTag)).start();
        }
        // 建立8個消費者執行緒
        for (int j = 0; j < consumerSize; j++) {
            new Thread(new Consumer(blockingQueue, stopTag)).start();
        }
    }
}

延遲佇列 DelayQueue

定義: Java 延遲佇列提供了在指定時間才能獲取佇列元素的功能,佇列頭元素是最接近過期的元素。沒有過期元素的話,使用 poll() 方法會返回 null 值,超時判定是通過getDelay(TimeUnit.NANOSECONDS)方法的返回值小於等於 0 來判斷。延時佇列不能存放空元素。

/**
 * 電影票類,實現了Delayed介面,重寫 compareTo 和 getDelay方法
 */
public class MovieTicket implements Delayed {
    //延遲時間
    private final long delay;
    //到期時間
    private final long expire;
    //資料
    private final String msg;
    //建立時間
    private final long now;
    public long getDelay() {
        return delay;
    }
    public long getExpire() {
        return expire;
    }
    public String getMsg() {
        return msg;
    }
    public long getNow() {
        return now;
    }
    /**
     * @param msg 訊息
     * @param delay 延期時間
     */
    public MovieTicket(String msg , long delay) {
        this.delay = delay;
        this.msg = msg;
        expire = System.currentTimeMillis() + delay;    //到期時間 = 當前時間+延遲時間
        now = System.currentTimeMillis();
    }
    /**
     * @param msg
     */
    public MovieTicket(String msg){
        this(msg,1000);
    }
    public MovieTicket(){
        this(null,1000);
    }
    /**
     * 獲得延遲時間   用過期時間-當前時間,時間單位毫秒
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire
                - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
    }
    /**
     * 用於延遲佇列內部比較排序  當前時間的延遲時間 - 比較物件的延遲時間
     * 越早過期的時間在佇列中越靠前
     * @param delayed
     * @return
     */
    @Override
    public int compareTo(Delayed delayed) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS)
                - delayed.getDelay(TimeUnit.MILLISECONDS));
    }
}

測試類:

public static void main(String[] args) {
    DelayQueue<MovieTicket> delayQueue = new DelayQueue<MovieTicket>();
    MovieTicket ticket = new MovieTicket("電影票1",10000);
    delayQueue.put(ticket);
    MovieTicket ticket1 = new MovieTicket("電影票2",5000);
    delayQueue.put(ticket1);
    MovieTicket ticket2 = new MovieTicket("電影票3",8000);
    delayQueue.put(ticket2);
    log.info("message:--->入隊完畢");
    while( delayQueue.size() > 0 ){
        try {
            ticket = delayQueue.take();
            log.info("電影票出隊:{}",ticket.getMsg());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

從執行結果可以看出佇列是延遲出隊,間隔和我們所設定的時間相同

到此這篇關於Java阻塞佇列BlockingQueue詳解的文章就介紹到這了,更多相關Java BlockingQueue 內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com