首頁 > 軟體

GoLang RabbitMQ TTL與死信佇列以及延遲佇列詳細講解

2022-12-17 14:00:42

TTL

TTL 全稱 Time To Live(存活時間/過期時間)。當訊息到達存活時間後,還沒有被消費,就會被自動清除。RabbitMQ可以設定兩種過期時間:

  • 對訊息設定過期時間。
  • 對整個佇列(Queue)設定過期時間。

如何設定

  • 設定佇列過期時間使用引數:x-message-ttl,單位:ms(毫秒),會對整個佇列訊息統一過期。
  • 設定訊息過期時間使用引數:expiration,單位:ms(毫秒),當該訊息在佇列頭部時(消費時),會單獨判斷這一訊息是否過期。

如果兩者都設定了過期時間,以時間短的為準。

在streadway/amqp庫提供的API中設定TTL

設定佇列過期時間:

QueueDeclare函數的最後一個引數是一個amqp.Table型別,它的宣告是這樣的: type Table map[string]interface{},其實是一個可以用於設定佇列屬性的map。

// 設定Queue ttl為5s
args := amqp.Table{"x-message-ttl": 5000}
q, e := ch.QueueDeclare(
		name,  //佇列名
		false, 
		true, 
		false, 
		false,
		args,   //設定Queue ttl為5s
	)

設定訊息過期時間:

e = q.channel.Publish(
		"",    
		queue, 
		false, 
		false, 
		amqp.Publishing{
			// 設定當前傳送訊息的過期時間為3s
			Expiration: "3000",
			ReplyTo:    q.Name,
			Body:       []byte(str),
})

死信佇列

當一個佇列中存在死信時,RabbitMQ會把訊息傳送給DLX(死信交換機),進而被路由到另一個佇列中,這個佇列就叫做死信佇列。

死信就是指沒有被消費者消費成功的訊息,一條訊息變成死信有三種情況:

  • 如果給訊息佇列設定了最大容量x-max-length,佇列已經滿了,後續再進來的訊息會溢位,無法被佇列接收就會變成死信。
  • 訊息接收時被拒絕會變成死信,例如呼叫Reject()函數,並設定requeuefalse
  • 如果給訊息佇列設定了訊息的過期時間x-message-ttl,或者傳送訊息時設定了當前訊息的過期時間,當訊息在佇列中的存活時間大於過期時間時,就會變成死信。

如何將死信傳送給DLX

為佇列設定引數即可,將要傳送死信的佇列設定以下兩個引數:

x-dead-letter-exchange: [DLX的名字]
x-dead-letter-routing-key: [DLX的routing key]

下面是死信佇列的工作流程:

延遲佇列

延時佇列就是用來存放需要在指定時間被處理的元素的佇列,通常可以用來處理一些具有過期性操作的業務。

比如十分鐘內未支付則取消訂單,原先這個功能我們可以使用定時器來實現,即每隔一段時間去資料庫對比未支付訂單的當前時間與訂單建立時間。但是定時器的時長難以確定,太長會導致訂單失效時間出現誤差,太短則會增巨量資料庫壓力。

實現

在RabbitMQ中沒有提供延遲佇列的功能,但是我們可以使用:TTL+死信佇列組合的方式來實現延遲佇列的效果。

下面是實現延遲佇列的流程圖:

Go實現延遲佇列

建立一個死信交換機

再建立一個死信佇列

將死信佇列繫結至死信交換機

建立一個正常佇列,並指定訊息過期後被髮往的死信交換機

生產者

func main() {
	conn, _ := amqp.Dial("amqp://guest:guest@35.76.111.125:5672/")
	ch, _ := conn.Channel()
	body := "This is a delayed message, created at " + time.Now().Format("2006-01-02 15:04:05")
	fmt.Println(body)
	// 傳送訊息到queue.normal佇列中
	ch.Publish("", "queue.normal", false, false, amqp.Publishing{
		Body:       []byte(body),
		Expiration: "10000", // 設定TTL為10秒
	})
	defer conn.Close()
	defer ch.Close()
}

消費者

func main() {
	conn, _ := amqp.Dial("amqp://guest:guest@35.76.111.125:5672/")
	ch, _ := conn.Channel()
	//監聽queue.dlx佇列
	msgs, _ := ch.Consume(
		"queue.dlx",
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	for d := range msgs {
		fmt.Printf("receive: %sn", d.Body) // 收到訊息,業務處理
	}
}

流程說明

生產者生產一條訊息,然後指定訊息的TTL為10s,接著將訊息發給普通佇列,訊息在普通佇列中過期後被髮往死信交換機,死信交換機將這條訊息路由給延遲佇列。消費者一直在監聽到延遲佇列中的死信後,開始消費。

到此這篇關於GoLang RabbitMQ TTL與死信佇列以及延遲佇列詳細講解的文章就介紹到這了,更多相關GoLang RabbitMQ內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com