首頁 > 軟體

Java RabbitMQ的工作佇列與訊息應答詳解

2022-03-08 16:01:01

Work Queues

工作佇列(任務佇列)主要思想是避免立即執行資源密集型任務,而不得不等待它完成,相反我們安排任務在之後執行。我們把任務封裝為訊息並將其傳送到佇列。在後臺執行的工作程序將彈出任務並最終執行作業。當有多個工作執行緒時,這些工作執行緒將一起處理這些任務。

其實就是生產者傳送大量的訊息,傳送到佇列之後,由多個消費者(工作執行緒)來處理訊息,並且每個訊息只能被處理一次。

1. 輪詢分發訊息

多個工作執行緒按照次序每來一個訊息執行一次。

1.1 抽取工具類

直接通過資訊獲取通道

/**
 * @Description RabbitMQ工具類
 * @date 2022/3/5 10:02
 */
public class RabbitMQUtils {
    public static Channel getChannel() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("1");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        return connection.createChannel();
    }
}

1.2 編寫兩個工作執行緒

Work2和Work1程式碼沒有區別,只需要對它做出區分即可。

public class Worker1 {
    // 指定佇列名稱
    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {

        // 獲取通道
        Channel channel = RabbitMQUtils.getChannel();

        // 宣告:接收訊息回撥
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("工作執行緒01:"+ new String(message.getBody()));
        };

        // 宣告:取消消費回撥
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("工作執行緒01取消接收:"+consumerTag);
        };

        System.out.println("工作執行緒01啟動完成......");

        channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
    }
}

1.3 編寫生產者

public class Producer {

    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMQUtils.getChannel();


        // 產生佇列
        channel.queueDeclare(QUEUE_NAME,false,false,true,null);

        // 訊息體
        Scanner scanner = new Scanner(System.in);
        int i = 1;
        while (scanner.hasNext()){
            String msg = scanner.next();
            msg = msg + i;
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            System.out.println("傳送成功:" + msg);
        }

        System.out.println("----------==========傳送完畢==========----------");
    }

}

1.4 執行測試

先啟動兩個工作執行緒,再啟動生產者。

出現404異常請參考下方1.6

生產者傳送情況:

輪詢狀態下兩個工作佇列接收狀態:

1.5 異常情況

在先啟動兩個消費者執行緒時,會提示404找不到佇列

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'work_queue' in vhost '/', class-id=60, method-id=20)

發生這個情況的原因很顯然是因為先啟動了消費者,但是在RabbitMQ中沒有建立相對應的佇列名稱,解決方法可以:

1.先啟動生產者建立佇列(也可以在RabbitMQ中建立佇列);

2.再啟動消費者就不會產生這個錯誤;

3.再在生產者中使用Scanner類去傳送訊息測試。

2. 訊息應答

消費者在接收到訊息並且處理該訊息之後,告訴RabbitMQ它已經處理了,RabbitMQ可以刪除訊息。其目的就是為了保護訊息在被處理之前不會消失。

2.1 自動應答

這種方式傳送後就被認定為已經傳送成功,所以在訊息接收到之前消費者的連線或者channel關閉,那麼這個訊息就會丟失。其特點是消費者可以傳遞過載的訊息,對傳遞的訊息沒有限制,但如果因記憶體耗盡消費者執行緒被系統殺死,就會使得多條訊息丟失。所以這個模式需要在資料安全性和吞吐量之間選擇,適合使用在消費者可以高效並以某種速率能夠處理這些訊息的情況下使用。

所以自動應答的方式侷限性很高。

2.2 手動應答

優點:可以批次應答和減少網路擁擠。

1.channel.basicAck(long deliveryTag, boolean multiple);:肯應應答,處理完訊息之後提醒RabbitMQ可以刪除當前佇列,deliveryTag:當前佇列中選中的訊息;multiple:是否批次應答。

2.channel.basicNack(long deliveryTag, boolean multiple, boolean requeue):否定應答,

3.channel.basicReject(long deliveryTag, boolean requeue):否定並且拒絕應答。

2.3 訊息自動重新入隊

如果消費者因為一些原因失去了對RabbitMQ的連線,導致沒有傳送ACK確認,RabbitMQ就會對該訊息進行重新排隊,並且分發給可以處理該訊息的消費者,所以即使某個消費者死亡,也可以保證訊息不會丟失。

2.4 手動應答測試

測試目的:在手動應答狀態下不會發生訊息丟失的情況。

測試方法:

1.建立兩個消費者;

2.使用工具類使執行緒睡眠一定時間;

3.在睡眠時關閉執行緒,看能否自動重新入隊。

2.4.1 生產者程式碼

/**
 * @Description 手動應答生產者
 * @date 2022/3/5 19:03
 */
public class Producer1 {

    // 指定佇列名
    private static final String TASK_QUEUE_RES = "queue_res";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        channel.queueDeclare(TASK_QUEUE_RES,false,false,false,null);
        Scanner scanner = new Scanner(System.in);
        int i = 0;
        while (scanner.hasNext()){
            i++;
            String msg = scanner.next() + i;
            channel.basicPublish("",TASK_QUEUE_RES,null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("傳送訊息:'" + msg + "'成功");
        }
    }
}

2.4.2 消費者程式碼

/**
 * @Description 手動應答消費者1
 * @date 2022/3/5 19:17
 */
public class Worker1 {

    private static final String TASK_QUEUE_RES = "queue_res";

    public static void main(String[] args)  throws Exception{
        Channel channel = RabbitMQUtils.getChannel();
        System.out.println("執行緒A等待接收......");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            // 模擬並行沉睡一秒
            try {
                Thread.sleep(1000);
                System.out.println("執行緒A接收訊息:"+ new String(message.getBody(), StandardCharsets.UTF_8));
                /**
                 * basicAck:
                 *          1. 訊息標記
                 *          2. 是否批次
                 */
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        channel.basicConsume(TASK_QUEUE_RES,false,deliverCallback,
                consumerTag -> {
                    System.out.println(consumerTag + "消費者取消消費");
                });

    }
}

Worker2類和1區別不大,將名稱改成B再將睡眠事件改成30即可。

2.4.3 測試

測試方法:

1.先啟動生產者建立佇列;

2.啟動兩個消費者接收訊息;

3.因為是輪詢方式,所以A執行緒接收之後肯定是B執行緒接收,在睡眠時關閉B執行緒,如果A執行緒接收到說明測試成功。

傳送訊息:

執行緒A接收:

再傳送訊息:

關閉執行緒B執行緒A接收到訊息:

測試成功!

總結

本篇文章就到這裡了,希望能夠給你帶來幫助,也希望您能夠多多關注it145.com的更多內容!   


IT145.com E-mail:sddin#qq.com