首頁 > 軟體

Java RabbitMQ的持久化和釋出確認詳解

2022-03-08 16:01:38

1. 持久化

當RabbitMQ服務停掉以後訊息生產者傳送過的訊息不丟失。預設情況下RabbitMQ退出或者崩潰時,會忽視掉佇列和訊息。為了保證訊息不丟失需要將佇列和訊息都標記為持久化。

1.1 實現持久化

1.佇列持久化:在建立佇列時將channel.queueDeclare();第二個引數改為true。

2.訊息持久化:在使用通道傳送訊息時channel.basicPublish();將第三個引數改為:MessageProperties.PERSISTENT_TEXT_PLAIN表示持久化訊息。

/**
 * @Description 持久化MQ
 * @date 2022/3/7 9:14
 */
public class Producer3 {
    private static final String LONG_QUEUE = "long_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 持久化佇列
        channel.queueDeclare(LONG_QUEUE,true,false,false,null);
        Scanner scanner = new Scanner(System.in);
        int i = 0;
        while (scanner.hasNext()){
            i++;
            String msg = scanner.next() + i;
            // 持久化訊息
            channel.basicPublish("",LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("傳送訊息:'" + msg + "'成功");
        }
    }
}

但是儲存訊息還有存在一個快取的間隔點,沒有真正的寫入磁碟,永續性保證不夠強,但是對於簡單佇列而言也綽綽有餘。

1.2 不公平分發

輪詢分發的方式在消費者處理效率不同的情況下並不適用。所以真正的公平應該是遵循能者多勞的前提。

在消費者處修改channel.basicQos(1);表示開啟不公平分發

/**
 * @Description 不公平分發消費者
 * @date 2022/3/7 9:27
 */
public class Consumer2 {
    private static final String LONG_QUEUE = "long_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            // 模擬並行沉睡三十秒
            try {
                Thread.sleep(30000);
                System.out.println("執行緒B接收訊息:"+ new String(message.getBody(), StandardCharsets.UTF_8));
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        // 設定不公平分發
        channel.basicQos(1);
        channel.basicConsume(LONG_QUEUE,false,deliverCallback,
                consumerTag -> {
                    System.out.println(consumerTag + "消費者取消消費");
                });
    }
}

1.3 測試不公平分發

測試目的:是否能實現能者多勞。

測試方法:兩個消費者睡眠不同的事件來模擬處理事件不同,如果處理時間(睡眠時間)短的能夠處理多個訊息就代表目的達成。

先啟動生產者建立佇列,再分別啟動兩個消費者。

生產者按照順序發四條訊息:

睡眠時間短的執行緒A接收到了三條訊息

而睡眠時間長的執行緒B只接收到的第二條訊息:

因為執行緒B在處理訊息時消耗的時間較長,所以就將其他訊息分配給了執行緒A。

實驗成功!

1.4 預取值

訊息的傳送和手動確認都是非同步完成的,因此就存在一個未確認訊息的緩衝區,開發人員希望能夠限制緩衝區的大小,用來避免緩衝區裡面無限制的未確認訊息問題。

這裡的預期值就值得是上述方法channel.basicQos();裡面的引數,如果在當前通道上存在等於引數的訊息就不會在安排當前通道進行消費訊息。

1.4.1 程式碼測試

測試方法:

1.新建兩個不同的消費者分別給定預期值5個2。

2.給睡眠時間長的指定為5,時間短的指定為2。

3.假如按照指定的預期值獲取訊息則表示測試成功,但並不是代表一定會按照5和2分配,這個類似於權重的判別。

程式碼根據上述程式碼修改預期值即可。

2. 釋出確認

釋出確認就是生產者釋出訊息到佇列之後,佇列確認進行持久化完畢再通知給生產者的過程。這樣才能保證訊息不會丟失。

需要注意的是需要開啟佇列持久化才能使用確認釋出。
開啟方法:channel.confirmSelect();

2.1 單個確認釋出

是一種同步釋出的方式,即傳送完一個訊息之後只有確認它確認釋出後,後續的訊息才會繼續釋出,在指定的時間內沒有確認就會丟擲異常。缺點就是特別慢。

/**
 * @Description 確認釋出——單個確認
 * @date 2022/3/7 14:49
 */
public class SoloProducer {
    private static final int MESSAGE_COUNT = 100;
    private static final String QUEUE_NAME = "confirm_solo";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 產生佇列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 開啟確認釋出
        channel.confirmSelect();
        // 記錄開始時間
        long beginTime = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = ""+i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            // 單個釋出確認
            boolean flag = channel.waitForConfirms();
            if (flag){
                System.out.println("傳送訊息:" + i);
            }
        }
        // 記錄結束時間
        long endTime = System.currentTimeMillis();
        System.out.println("傳送" + MESSAGE_COUNT + "條訊息消耗:"+(endTime - beginTime) + "毫秒");   }
}

2.2 批次確認釋出

一批一批的確認釋出可以提高系統的吞吐量。但是缺點是發生故障導致釋出出現問題時,需要將整個批次處理儲存在記憶體中,後面再重新發布。

/**
 * @Description 確認釋出——批次確認
 * @date 2022/3/7 14:49
 */
public class BatchProducer {
    private static final int MESSAGE_COUNT = 100;
    private static final String QUEUE_NAME = "confirm_batch";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 產生佇列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 開啟確認釋出
        channel.confirmSelect();
        // 設定一個多少一批確認一次。
        int batchSize = MESSAGE_COUNT / 10;
        // 記錄開始時間
        long beginTime = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = ""+i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            // 批次釋出確認
            if (i % batchSize == 0){
                if (channel.waitForConfirms()){
                    System.out.println("傳送訊息:" + i);
                }
            }
        }
        // 記錄結束時間
        long endTime = System.currentTimeMillis();
        System.out.println("傳送" + MESSAGE_COUNT + "條訊息消耗:"+(endTime - beginTime) + "毫秒");
    }
}

顯然效率要比單個確認釋出的高很多。

2.3 非同步確認釋出

在程式設計上比上述兩個要複雜,但是價效比很高,無論是可靠性還行效率的都好很多,利用回撥函數來達到訊息可靠性傳遞的。

/**
 * @Description 確認釋出——非同步確認
 * @date 2022/3/7 14:49
 */
public class AsyncProducer {
    private static final int MESSAGE_COUNT = 100;
    private static final String QUEUE_NAME = "confirm_async";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 產生佇列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 開啟確認釋出
        channel.confirmSelect();
        // 記錄開始時間
        long beginTime = System.currentTimeMillis();
        // 確認成功回撥
        ConfirmCallback ackCallback = (deliveryTab,multiple) ->{
            System.out.println("確認成功訊息:" + deliveryTab);
        };
        // 確認失敗回撥
        ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
            System.out.println("未確認的訊息:" + deliveryTab);
        };
        // 訊息監聽器
        /**
         * addConfirmListener:
         *                  1. 確認成功的訊息;
         *                  2. 確認失敗的訊息。
         */
        channel.addConfirmListener(ackCallback,nackCallback);
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = "" + i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
        }

        // 記錄結束時間
        long endTime = System.currentTimeMillis();
        System.out.println("傳送" + MESSAGE_COUNT + "條訊息消耗:"+(endTime - beginTime) + "毫秒");
    }
}

2.4 處理未確認的訊息

最好的處理方式把未確認的訊息放到一個基於記憶體的能被髮布執行緒存取的佇列。

例如:ConcurrentLinkedQueue可以在確認佇列confirm callbacks與釋出執行緒之間進行訊息的傳遞。

處理方式:

1.記錄要傳送的全部訊息;

2.在釋出成功確認處刪除;

3.列印未確認的訊息。

使用一個雜湊表儲存訊息,它的優點:

可以將需要和訊息進行關聯;輕鬆批次刪除條目;支援高並行。

ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
/**
 * @Description 非同步釋出確認,處理未釋出成功的訊息
 * @date 2022/3/7 18:09
 */
public class AsyncProducerRemember {
    private static final int MESSAGE_COUNT = 100;
    private static final String QUEUE_NAME = "confirm_async_remember";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 產生佇列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 開啟確認釋出
        channel.confirmSelect();
        // 執行緒安全有序的一個hash表,適用與高並行
        ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>();
        // 記錄開始時間
        long beginTime = System.currentTimeMillis();
        // 確認成功回撥
        ConfirmCallback ackCallback = (deliveryTab, multiple) ->{
            //2. 在釋出成功確認處刪除;
            // 批次刪除
            if (multiple){
                ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab);
                confirmMap.clear();
            }else {
                // 單獨刪除
                map.remove(deliveryTab);
            }
            System.out.println("確認成功訊息:" + deliveryTab);
        };
        // 確認失敗回撥
        ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
            // 3. 列印未確認的訊息。
            System.out.println("未確認的訊息:" + map.get(deliveryTab) + ",標記:" + deliveryTab);
        };
        // 訊息監聽器
        /**
         * addConfirmListener:
         *                  1. 確認成功的訊息;
         *                  2. 確認失敗的訊息。
         */
        channel.addConfirmListener(ackCallback,nackCallback);
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = "" + i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            // 1. 記錄要傳送的全部訊息;
            map.put(channel.getNextPublishSeqNo(),msg);
        }

        // 記錄結束時間
        long endTime = System.currentTimeMillis();
        System.out.println("傳送" + MESSAGE_COUNT + "條訊息消耗:"+(endTime - beginTime) + "毫秒");
    }
}

總結

顯然來說,非同步處理除了在編碼處有些麻煩,在處理時間效率和可用性上都是比單處理和批次處理好很多。

本篇文章就到這裡了,希望能夠給你帶來幫助,也希望您能夠多多關注it145.com的更多內容!    


IT145.com E-mail:sddin#qq.com