首頁 > 軟體

Java多執行緒之生產者消費者模式詳解

2022-03-01 19:01:26

問題:

1.什麼是阻塞佇列?如何使用阻塞佇列來實現生產者-消費者模型?

2. 生產者消費者模型的作用是什麼?

1. 生產者消費者模型

在生產者-消費者模式中,通常有兩類執行緒,即生產者執行緒(若干個)和消費者執行緒(若干個)。生產者執行緒向訊息佇列加入資料,消費者執行緒則從訊息佇列消耗資料。生產者和消費者、訊息佇列之間的關係結構圖如圖:

(1) 訊息佇列可以用來平衡生產和消費的執行緒資源;

(2) 生產者僅負責產生結果資料,不關心資料該如何處理,而消費者專心處理結果資料 ;

(3) 訊息佇列是有容量限制的,訊息佇列滿後,生產者不能再加入資料;訊息佇列空時,消費者不能再取出資料;

(4) 訊息佇列是執行緒安全的,在並行操作訊息佇列的過程中,不能出現資料不一致的情況;或者在多個執行緒並行更改共用資料後,不會造成出現髒資料的情況;

(5) JDK 中各種阻塞佇列,採用的就是這種模式;

2. 實現生產者消費者模型

1、訊息佇列中存放的訊息類:

/**
 * 訊息佇列中存放的訊息類
 */
final public class Message {
    private int id;
    private int value;
    public Message(int id,int value){
        this.id = id;
        this.value = value;
    }
    public int getId() {
        return id;
    }
    public int getValue() {
        return value;
    }
}

2、實現阻塞佇列(訊息佇列) :

import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
/**
 * 實現一個阻塞佇列(訊息佇列),實現java執行緒間通訊
 */
@Slf4j
public class MessageQueue {
    // 訊息佇列的容量
    private int capacity;
    // 訊息佇列
    LinkedList<Message> messageQueue = new LinkedList<>();
    // 設定訊息佇列的容量
    public MessageQueue(int capacity){
        this.capacity = capacity;
    }
    // 從訊息佇列中取訊息
    public Message take(){
        synchronized (messageQueue){
            // 如果訊息佇列為空
            while (messageQueue.isEmpty()){
                try {
                    log.debug("佇列為空, 消費者執行緒等待");
                    messageQueue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Message message = messageQueue.removeFirst();
            log.debug("已消費訊息 {}", message);
            // 走到這,說明訊息佇列不為null
            messageQueue.notifyAll();
            return message;
        }
    }
    // 往訊息佇列中放訊息
    public void put(Message message){
        synchronized (messageQueue){
            // 如果訊息佇列已滿
            while (messageQueue.size()==capacity){
                try {
                    log.debug("佇列已滿, 生產者執行緒等待");
                    messageQueue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            messageQueue.addLast(message);
            log.debug("已生產訊息 {}", message);
            // 走到這,說明訊息佇列不滿
            messageQueue.notifyAll();
        }
    }
}

3、測試:

public class Main {
    public static void main(String[] args) {
        MessageQueue queue = new MessageQueue(2);
        for(int i=0;i<3;i++){
            int id = i;
            new Thread(()->{
                queue.put(new Message(id,id));
            },"生產者").start();
        }
        new Thread(()->{
            while (true){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Message message = queue.take();
            }
        },"消費者").start();
    }
}

執行結果:

15:31:28.488 [生產者] DEBUG com.example.test.MessageQueue - 已生產訊息 com.example.test.Message@54309a75
15:31:28.507 [生產者] DEBUG com.example.test.MessageQueue - 已生產訊息 com.example.test.Message@50915389
15:31:28.507 [生產者] DEBUG com.example.test.MessageQueue - 佇列已滿, 生產者執行緒等待
15:31:29.486 [消費者] DEBUG com.example.test.MessageQueue - 已消費訊息 com.example.test.Message@54309a75
15:31:29.486 [生產者] DEBUG com.example.test.MessageQueue - 已生產訊息 com.example.test.Message@6340ac12
15:31:30.487 [消費者] DEBUG com.example.test.MessageQueue - 已消費訊息 com.example.test.Message@50915389
15:31:31.487 [消費者] DEBUG com.example.test.MessageQueue - 已消費訊息 com.example.test.Message@6340ac12
15:31:32.488 [消費者] DEBUG com.example.test.MessageQueue - 佇列為空, 消費者執行緒等待

3. 生產者消費者模型的作用是什麼?

(1) 通過平衡生產者的生產能力和消費者的消費能力來提升整個系統的執行效率 ;

(2) 解耦,解耦意味著生產者和消費者之間的聯絡少,聯絡越少越可以獨自發展而不需要收到相互的制約;

總結

本篇文章就到這裡了,希望能夠給你帶來幫助,也希望您能夠多多關注it145.com的更多內容!      


IT145.com E-mail:sddin#qq.com