首頁 > 軟體

Java多執行緒案例之阻塞佇列詳解

2022-10-20 14:03:31

一.阻塞佇列介紹

1.1阻塞佇列特性

阻塞佇列特性:

一.安全性

二.產生阻塞效果

阻塞佇列是一種特殊的佇列. 也遵守 “先進先出” 的原則.阻塞佇列能是一種執行緒安全的資料結構, 並且具有以下特性:

  • 當佇列滿的時候, 繼續入佇列就會阻塞, 直到有其他執行緒從佇列中取走元素.
  • 當佇列空的時候, 繼續出佇列也會阻塞, 直到有其他執行緒往佇列中插入元素.

阻塞佇列的一個典型應用場景就是 “生產者消費者模型”. 這是一種非常典型的開發模型.

1.2阻塞佇列的優點

我們可以將阻塞佇列比做成"生產者"和"消費者"的"交易平臺".

我們可以把這個模型來比做成"包餃子"

A 的作用是擀餃子皮,也就是"生產者"

B 的作用是包餃子,也就是"消費者"

X 的作用一個當作放擀好餃子皮的一個盤中,也就是阻塞佇列

這樣我們根據A,B,X可以想象以下場景

場景一:

當A擀餃子皮的速度過快,X被A的杆好餃子皮放滿了,這樣A就需要停止擀餃子皮這一個操作,這時只能等待B來利用A提供的餃子皮包餃子後X所空出的空間,來給A提供生產的環境

場景二:

當B包餃子的速度過快,X被B的包餃子所用的餃子皮用空,這樣B就需要停止包餃子這一個操作,這時只能等待A提供的餃子皮包餃子後X所存在餃子皮,來給B提供消費的環境

二.生產者消費者模型

生產者消費者模式就是通過一個容器來解決生產者和消費者的強耦合問題

生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取

(1) 阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力.

比如在 “秒殺” 場景下, 伺服器同一時刻可能會收到大量的支付請求. 如果直接處理這些支付請求,伺服器可能扛不住(每個支付請求的處理都需要比較複雜的流程). 這個時候就可以把這些請求都放到一個阻塞佇列中, 然後再由消費者執行緒慢慢的來處理每個支付請求.這樣做可以有效進行 “削峰”, 防止伺服器被突然到來的一波請求直接沖垮.

(2) 阻塞佇列也能使生產者和消費者之間 解耦.

比如過年一家人一起包餃子. 一般都是有明確分工, 比如一個人負責擀餃子皮, 其他人負責包. 擀餃子皮的人就是 “生產者”, 包餃子的人就是 “消費者”.擀餃子皮的人不關心包餃子的人是誰(能包就行, 無論是手工包, 藉助工具, 還是機器包), 包餃子的人也不關心擀餃子皮的人是誰(有餃子皮就行, 無論是用擀麵杖擀的, 還是拿罐頭瓶擀, 還是直接從超市買的).

2.1阻塞佇列對生產者的優化

優化一:能夠讓多個伺服器程式之間更充分的解耦合:

如果不使用生產者和消費者模型,此時A和B的耦合性比較強,如果A執行緒出現一些狀況B就會掛,B執行緒出現一些狀況A就會掛,這時當我們引入阻塞佇列後我們就可以將A,B執行緒分開,如果A,B執行緒掛了有阻塞佇列的存在下,是不會影響別的執行緒

優化二:能夠對於請求進行"削峰填谷":

我們可以聯想到我國的三峽大壩,三峽大壩就相當於阻塞佇列,當我們遇到雨水大的季節,我們就可以關閉三峽大壩,利用三峽大壩來存水;當我們遇到乾旱期,我們就可以開啟三峽大壩的門,來放水解決乾旱問題

三.標準庫中的阻塞佇列

3.1Java提供阻塞佇列實現的標準類

java官方也提供了阻塞佇列的標準類,主要有下面幾個:

標準類說明
ArrayBlockingQueue一個由陣列結構組成的有界阻塞佇列
LinkedBlockingQueue一個由連結串列結構組成的有界阻塞佇列
PriorityBlockingQueue一個支援優先順序排序的無界阻塞佇列
DelayQueue一個使用優先順序佇列實現的無界阻塞佇列
SynchronousQueue一個不儲存元素的阻塞佇列
LinkedTransferQueue一個由連結串列結構組成的無界阻塞佇列
LinkedBlockingDeque一個由連結串列結構組成的雙向阻塞佇列
BlockingQueue介面單向阻塞佇列實現了該介面
BlockingDeque介面雙向阻塞佇列實現了該介面

3.2Blockingqueue基本使用

在 Java 標準庫中內建了阻塞佇列. 如果我們需要在一些程式中使用阻塞佇列, 直接使用標準庫中的即可.

BlockingQueue 是一個介面. 真正實現的類是 LinkedBlockingQueue.

put 方法用於阻塞式的入佇列, take 用於阻塞式的出佇列.

BlockingQueue 也有 offer, poll, peek 等方法, 但是這些方法不帶有阻塞特性.

BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 入佇列
queue.put("abc");
// 出佇列. 如果沒有 put 直接 take, 就會阻塞.
String elem = queue.take();

四.阻塞佇列實現

4.1阻塞佇列的程式碼實現

我們通過 “迴圈佇列” 的方式來實現

使用 synchronized 進行加鎖控制.put 插入元素的時候, 判定如果佇列滿了, 就進行 wait. (注意, 要在迴圈中進行 wait. 被喚醒時不一定佇列就不滿了, 因為同時可能是喚醒了多個執行緒).take 取出元素的時候, 判定如果佇列為空, 就進行 wait. (也是迴圈 wait)

我們在設計阻塞佇列的時候可以將佇列聯想成一個圓

class BlockingQueue{
    //佇列裡存放的個數
   volatile private int size = 0;
    //佇列的頭節點
    private int head = 0;
    //佇列的尾節點
    private int prov = 0;
    //建立一個陣列,我們來給這個陣列的容量設定為100
    private int[] array = new int[100];
    //建立一個專業的鎖物件
    private Object object = new Object();
    //實現阻塞佇列中的put方法
    public void put(int value) throws InterruptedException {
        synchronized (object) {
            //當陣列已經滿了
            if (size == array.length) {
                object.wait();
            } else {
                //我們可以優化成prov = (prov + 1) % items.length
                array[prov] = value;
                prov ++;
               if (prov >= array.length) {
                   prov = 0;
               }
            }
            size++;
            object.notify();
        }
    }
    //實現阻塞佇列中的take方法
    public int take() throws InterruptedException {
        synchronized (object) {
            if (size == 0) {
                object.wait();
            }
            int x = array[head];
            head++;
            if (head >= array.length) {
                head = 0;
            }
            size--;
            object.notify();
            return x;
        }
    }
}

4.2阻塞佇列搭配生產者與消費者的程式碼實現

class BlockingQueue{
    //佇列裡存放的個數
   volatile private int size = 0;
    //佇列的頭節點
    private int head = 0;
    //佇列的尾節點
    private int prov = 0;
    //建立一個陣列,我們來給這個陣列的容量設定為100
    private int[] array = new int[100];
    //建立一個專業的鎖物件
    private Object object = new Object();
    //實現阻塞佇列中的put方法
    public void put(int value) throws InterruptedException {
        synchronized (object) {
            //當陣列已經滿了
            if (size == array.length) {
                object.wait();
            } else {
                //我們可以優化成prov = (prov + 1) % items.length
                array[prov] = value;
                prov ++;
               if (prov >= array.length) {
                   prov = 0;
               }
            }
            size++;
            object.notify();
        }
    }
    //實現阻塞佇列中的take方法
    public int take() throws InterruptedException {
        synchronized (object) {
            if (size == 0) {
                object.wait();
            }
            int x = array[head];
            head++;
            if (head >= array.length) {
                head = 0;
            }
            size--;
            object.notify();
            return x;
        }
    }
}
public class Test {
    public static void main(String[] args) {
        BlockingQueue blockingQueue = new BlockingQueue();
        Thread thread1 = new Thread(()-> {
            while (true) {
                for (int i = 0; i < 100; i++) {
                    try {
                        blockingQueue.put(i);
                        System.out.println("生產了"+i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        Thread thread2 = new Thread(()->{
                while (true) {
                    try {
                        int b = blockingQueue.take();
                        System.out.println("消耗了"+b);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
        });
        thread1.start();
        thread2.start();
    }
}

以上就是Java多執行緒案例之阻塞佇列詳解的詳細內容,更多關於Java多執行緒阻塞佇列的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com