<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
向無限佇列新增元素的所有操作都將永遠不會阻塞(也是執行緒安全的),因此它可以增長到非常大的容量。 使用無限阻塞佇列 BlockingQueue 設計生產者 - 消費者模型時最重要的是消費者應該能夠像生產者向佇列新增訊息一樣快地消費訊息 。否則可能記憶體不足而丟擲 OutOfMemory 異常。
定義:執行緒通訊中,在任意時刻,無論並行有多高,在單個 JVM 上,同一時間永遠只有一個執行緒能對佇列進行入隊或出隊操作。BlockingQueue 可以線上程之間共用而無需任何顯式同步
阻塞佇列的型別:
JAVA中的應用場景 : 執行緒池、SpringCloud-Eureka 三級快取、Nacos、MQ、Netty 等
它們都實現了BlockingQueue介面,都有put()和take()等方法,建立方式如下:
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<> (666);
新增元素:
方法 | 含義 |
---|---|
add() | 如果插入成功則返回 true,否則丟擲 IllegalStateException 異常 |
put() | 將指定的元素插入佇列,如果佇列滿了,會阻塞直到有空間插入 |
offer() | 如果插入成功則返回 true,否則返回 false |
offer(E e, long timeout, TimeUnit unit) | 嘗試將元素插入佇列,如果佇列已滿,會阻塞直到有空間插入,阻塞有時間控制 |
檢索元素:
方法 | 含義 |
---|---|
take() | 獲取佇列的頭部元素並將其刪除,如果佇列為空,則阻塞並等待元素變為可用 |
poll(long timeout, TimeUnit unit) | 檢索並刪除佇列的頭部,如有必要,等待指定的等待時間以使元素可用,如果超時,則返回 null |
實現:同步等待佇列(CLH)+ 條件等待佇列滿足條件的元素在CLH佇列中等待鎖,不滿足條件的佇列挪到條件等待佇列,滿足條件後再從 tail 插入 CLH 佇列
執行緒獲取鎖的條件: 在 CLH 佇列裡等待的 Node 節點,並且 Node 節點的前驅節點是 Singal。條件等待佇列裡的執行緒是無法獲取鎖的。
/** * 構造方法 * 還有兩個建構函式,一個無fair引數,一個可傳入集合,建立時插入佇列 * @param capacity 固定容量 * @param fair 預設是false:存取順序未指定; true:按照FIFO順序處理 */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); // 根據fair建立對應的鎖 // 條件物件,配合容器能滿足業務 notEmpty = lock.newCondition(); // 出隊條件物件 notFull = lock.newCondition(); // 入隊條件物件 } /** * 入隊方法 * 在佇列的尾部插入指定的元素,如果佇列已滿,則等待空間可用 */ public void put(E e) throws InterruptedException { checkNotNull(e); // 檢查put物件是否為空,空丟擲異常 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 若未被中斷嘗試獲取鎖,詳見下文 try { // 佇列中元素的數量 等於 排隊元素的長度 while (count == items.length) notFull.await(); // 見下文 enqueue(e); // 元素入隊 } finally { lock.unlock(); } } /** * 出隊方法 * 獲取佇列的頭部元素並將其刪除,如果佇列為空,則阻塞並等待元素變為可用 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 見下文 try { while (count == 0) notEmpty.await(); // 見下文 return dequeue(); // 元素出隊 } finally { lock.unlock(); } }
令當前執行緒等待,直到收到訊號或被中斷詳:與此 Condition 關聯的鎖被自動釋放,進入等待,並且處於休眠狀態,直到發生以下四種情況之一:
在這些情況下,在此方法返回之前,當前執行緒必須重新獲得與此條件相關聯的鎖。當執行緒返回時,保證它持有這個鎖。
如果當前執行緒有以下兩種情況之一:
BlockingQueue 可以線上程之間共用而無需任何顯式同步,在生產者消費者之間,只需要將阻塞佇列以引數的形式進行傳遞即可。它內部的機制會自動保證執行緒的安全性。
生產者:實現了 Runnable 介面,每個生產者生產100種商品和1箇中斷標記後完成執行緒任務
@Slf4j @Slf4j public class Producer implements Runnable{ // 作為引數的阻塞佇列 private BlockingQueue<Integer> blockingQueue; private final int stopTag; /** * 構造方法 * @param blockingQueue * @param stopTag */ public Producer(BlockingQueue<Integer> blockingQueue,int stopTag) { this.blockingQueue = blockingQueue; this.stopTag = stopTag; } @Override public void run() { try { generateNumbers(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void generateNumbers() throws InterruptedException { // 每個生產者都隨機生產10種商品 for (int i = 0; i < 10; i++) { int product = ThreadLocalRandom.current().nextInt(1000,1100); log.info("生產者{}號,生產了商品,編號為{}",Thread.currentThread().getId(),product); blockingQueue.put(product); } // 生產終止標記 blockingQueue.put(stopTag); log.info("生產者{}號,生產了第終止標記編號{}",Thread.currentThread().getId(),Thread.currentThread().getId()); } }
消費者:消費者拿到終止消費標記終止消費,否則消費商品,拿到終止標記後完成執行緒任務
@Slf4j public class Consumer implements Runnable{ // 作為引數的阻塞佇列 private BlockingQueue<Integer> queue; private final int stopTage; public Consumer(BlockingQueue<Integer> queue, int stopTage) { this.queue = queue; this.stopTage = stopTage; } @Override public void run() { try { while (true) { Integer product = queue.take(); if (product.equals(stopTage)) { log.info("{}號消費者,停止消費,因為拿到了停止消費標記",Thread.currentThread().getId()); return; } log.info("{}號消費者,拿到的商品編號:{}",Thread.currentThread().getId(),product); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
使用者端類: 建立與計算機 CPU 核數相同的執行緒數,與 16個生產者
public class ProductConsumerTest { public static void main(String[] args) { // 阻塞佇列容量 int blockingQueueSize = 10; // 生產者數量 int producerSize = 16; // 消費者數量 = 計算機執行緒核數 8 int consumerSize = Runtime.getRuntime().availableProcessors(); // 終止消費標記 int stopTag = Integer.MAX_VALUE; BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(blockingQueueSize); // 建立16個生產者執行緒 for (int i = 0; i < producerSize; i++) { new Thread(new Producer(blockingQueue, stopTag)).start(); } // 建立8個消費者執行緒 for (int j = 0; j < consumerSize; j++) { new Thread(new Consumer(blockingQueue, stopTag)).start(); } } }
定義: Java 延遲佇列提供了在指定時間才能獲取佇列元素的功能,佇列頭元素是最接近過期的元素。沒有過期元素的話,使用 poll() 方法會返回 null 值,超時判定是通過getDelay(TimeUnit.NANOSECONDS)方法的返回值小於等於 0 來判斷。延時佇列不能存放空元素。
/** * 電影票類,實現了Delayed介面,重寫 compareTo 和 getDelay方法 */ public class MovieTicket implements Delayed { //延遲時間 private final long delay; //到期時間 private final long expire; //資料 private final String msg; //建立時間 private final long now; public long getDelay() { return delay; } public long getExpire() { return expire; } public String getMsg() { return msg; } public long getNow() { return now; } /** * @param msg 訊息 * @param delay 延期時間 */ public MovieTicket(String msg , long delay) { this.delay = delay; this.msg = msg; expire = System.currentTimeMillis() + delay; //到期時間 = 當前時間+延遲時間 now = System.currentTimeMillis(); } /** * @param msg */ public MovieTicket(String msg){ this(msg,1000); } public MovieTicket(){ this(null,1000); } /** * 獲得延遲時間 用過期時間-當前時間,時間單位毫秒 * @param unit * @return */ @Override public long getDelay(TimeUnit unit) { return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS); } /** * 用於延遲佇列內部比較排序 當前時間的延遲時間 - 比較物件的延遲時間 * 越早過期的時間在佇列中越靠前 * @param delayed * @return */ @Override public int compareTo(Delayed delayed) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - delayed.getDelay(TimeUnit.MILLISECONDS)); } }
測試類:
public static void main(String[] args) { DelayQueue<MovieTicket> delayQueue = new DelayQueue<MovieTicket>(); MovieTicket ticket = new MovieTicket("電影票1",10000); delayQueue.put(ticket); MovieTicket ticket1 = new MovieTicket("電影票2",5000); delayQueue.put(ticket1); MovieTicket ticket2 = new MovieTicket("電影票3",8000); delayQueue.put(ticket2); log.info("message:--->入隊完畢"); while( delayQueue.size() > 0 ){ try { ticket = delayQueue.take(); log.info("電影票出隊:{}",ticket.getMsg()); } catch (InterruptedException e) { e.printStackTrace(); } } }
從執行結果可以看出佇列是延遲出隊,間隔和我們所設定的時間相同
到此這篇關於Java阻塞佇列BlockingQueue詳解的文章就介紹到這了,更多相關Java BlockingQueue 內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45