首頁 > 軟體

詳解RabbitMQ中死信佇列和延遲佇列的使用詳解

2022-05-28 14:00:16

簡介

本文介紹RabbitMQ的死信佇列和延遲佇列。

本內容也是Java後端面試中常見的問題。

死信佇列

簡介

DLX,全稱為Dead-Letter-Exchange,可以稱之為死信交換器,也有人稱之為死信郵箱。當訊息在一個佇列中變成死信(dead message)之後,它能被重新被傳送到另一個交換器中,這個交換器就是DLX,繫結DLX的佇列就稱之為死信佇列。

以下幾種情況會導致訊息變成死信:

  • 訊息被拒絕(Basic.Reject/Basic.Nack),並且設定requeue引數為false;
  • 訊息過期;
  • 佇列達到最大長度。

DLX是一個正常的交換器,和一般的交換器沒有區別,它能在任何的佇列上被指定,實際上就是設定某個佇列的屬性。當這個佇列中存在死信時,RabbitMQ就會自動地將這個訊息重新發布到設定的DLX上去,進而被路由到另一個佇列,即死信佇列。可以監聽這個佇列中的訊息以進行相應的處理,這個特性與將訊息的TTL設定為0配合使用可以彌補immediate引數的功能。

為佇列新增DLX的方法

法1:程式碼方式

//建立 DLX: dlx_exchange
channel.exchangeDeclare("dlx_exchange", "direct" );
Map<String, Object> args = new HashMap<String, Object>;
args.put("x-dead-letter-exchange", "dlx_exchange");
//為佇列myqueue新增DLX
channel.queueDeclare("myqueue", false, false, false, args);

也可以為這個DLX指定路由鍵。(如果沒有特殊指定,則使用原佇列的路由鍵)

args.put("x-dead-letter-routing-key","dlx-routing-key"); 

法2:命令方式

rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"dlx_exchange"}' --apply-to queues

範例

程式碼

channel.exchangeDeclare("exchange.dlx", "direct", true);
channel.exchangeDeclare("exchange.normal", "fanout", true);
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 10000);
args.put("x-dead-letter-exchange" , "exchange.dlx");
args.put("x-dead-letter-routing-key" , "routingkey");
channel.queueDeclare("queue.normal" , true, false, false, args);
channel.queueBind("queue.normal", "exchange.normal", "");
channel.queueDeclare("queue.dlx", true, false, false, null);
channel.queueBind("queue.dlx", "exchange.dlx" , "routingkey");
channel.basicPublish("exchange.normal" , "rk",
MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes());

這裡建立了兩個交換器exchange.normal和exchange.dlx,分別繫結兩個佇列queue.normal和queue.dlx。

Web管理頁面結果

由下圖(圖1-1)的Web管理頁面可以看出,兩個佇列都被標記了“D”,這個是durable的縮寫,即設定了佇列持久化。queue.normal這個佇列還設定了TTL、DLX和DLK,其中DLX指的是
x-dead-letter-routing-key這個屬性。 

圖1-1

案例分析

參考下圖(圖1-2),生產者首先傳送一條攜帶路由鍵為“rk”的訊息,然後經過交換器exchange.normal順利地儲存到佇列queue.normal中。由於佇列queue.normal設定了過期時間為10s,在這10s內沒有消費者消費這條訊息,那麼判定這條訊息為過期。由於設定了DLX,過期之時,訊息被丟給交換器exchange.dlx中,這時找到與exchange.dlx匹配的佇列queue.dlx,最後訊息被儲存在queue.dk這個死信佇列中。 

圖1-2

對於RabbitMQ來說,DLX是一個非常有用的特性。它可以處理異常情況下,訊息不能夠被消費者正確消費(消費者呼叫了Basic.Nack或者Basic.Reject)而被置入死信佇列中的情況,後續分析程式可以通過消費這個死信佇列中的內容來分析當時所遇到的異常情況,進而可以改善和優化系統。DLX配合TTL使用還可以實現延遲佇列的功能,詳細請看下一節。

延遲佇列

簡介

延遲佇列用來存放延遲訊息。延遲訊息:指當訊息被傳送以後,不想讓消費者立刻拿到訊息,而是等待特定時間後,消費者才能拿到這個訊息進行消費。

在AMQP協定中,或者RabbitMQ本身沒有直接支援延遲佇列的功能,但是有兩種方案來間接實現:

  • 方案1:採用rabbitmq-delayed-message-exchange 外掛實現。(RabbitMQ 3.6.x開始支援)
  • 方案2:通過前面所介紹的DLX和TTL模擬出延遲佇列的功能。

在圖1-2中,不僅展示的是死信佇列的用法,也是延遲佇列的用法,對於queue.dlx這個死信佇列來說,同樣可以看作延遲佇列。假設一個應用中需要將每條訊息都設定為10秒的延遲,

生產者通過exchange.normal這個交換器將傳送的訊息儲存在queue.normal這個佇列中。消費者訂閱的並非是queue.normal這個佇列,而是queue.dlx這個佇列。當訊息從queue.normal這個佇列中過期之後被存入queue.dlx這個佇列中,消費者就恰巧消費到了延遲10秒的這條訊息。

在真實應用中,對於延遲佇列可以根據延遲時間的長短分為多個等級,一般分為5秒、10秒、30秒、1分鐘、5分鐘、10分鐘、30分鐘、1小時這幾個維度,當然也可以再細化一下。

以下圖(圖2-1)為例進行說明。為簡化,只設定5秒、10秒、30秒、1分鐘這四個等級。根據需求的不同,生產者傳送訊息的時候通過設定不同的路由鍵,將訊息傳送到與交換器繫結的不同的佇列中。這裡佇列也分別設定了DLX和相應的死信佇列,當相應的訊息過期時,就會轉存到相應的死信佇列(即延遲佇列)中,這樣消費者根據業務自身的情況,分別選擇不同延遲等級的延遲佇列進行消費。

圖2-1

使用場景

延遲佇列的使用場景有很多,比如:

使用者下訂單場景:使用者下單後有30分鐘的時間支付,若30分鐘內沒有支付,則將這個訂單取消。

方案:使用者下單後將取消訂單的訊息傳送到延遲佇列,延遲時間設定為30分鐘。取消訂單這個訊息的訂閱者程式在30分鐘後收到訊息,判斷該訂單的狀態是否為已支付,若還沒支付,則將該訂單狀態設定為:已取消。

定時遙控場景:使用者想用手機遠端遙控家裡的智慧裝置在指定的時間工作。

方案:假設使用者想要的操作是:開啟熱水器。首先,將開啟熱水器這個訊息傳送到延遲佇列,延遲時間設定到使用者想要的時間到現在時間的差值。開啟熱水器這個訊息的訂閱者程式在指定時間收到訊息,再將指令推播到智慧裝置。

需要注意的是,延遲佇列的訊息是不能取消的,解決方案是:在消費訊息的時候判斷這個訊息對應的業務的當前狀態。例如:對於取消訂單來說,收到訊息時,讀取這個訊息所對應的資料庫資訊,如果已經是已付款狀態了,就不進行任何操作了,如果是未支付狀態,則改為已取消。

到此這篇關於詳解RabbitMQ中死信佇列和延遲佇列的使用詳解的文章就介紹到這了,更多相關RabbitMQ死信佇列 延遲佇列內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com