首頁 > 軟體

RabbitMq如何做到訊息的可靠性投遞

2022-12-19 14:01:35

如何保證訊息不丟失

在使用RabbitMQ的時候,我們需要保證訊息不能丟失,訊息從生產者生產出來一直到消費者消費成功,這條鏈路是這樣的:

訊息的可靠投遞分為了兩大內容:傳送端的確認(p->broker和exchange->queue)和消費端的確認(queue->c)。

傳送端的確認

Rabbit提供了兩種方式來保證傳送端的訊息可靠性投遞:confirm 確認模式

和return 退回模式。

confirm 確認模式:訊息從 producer 到達 exchange 則會給 producer 傳送一個應答,我們需要開啟confirm模式,才能接收到這條應答。開啟方式是將Channel.Confirm(noWait bool)引數設定為false,表示同意傳送者將當前channel通道設定為confirm模式。

return 退回模式:訊息從 exchange–>queue 投遞失敗,會將訊息退回給producer。

消費端的確認

訊息從Queue傳送到消費端之後,消費端會傳送一個確認訊息:Consumer Ack,有兩種確認方式:自動確認和手動確認。

在編碼中,關於訊息的確認方式,我們需要在消費者端呼叫Consumer函數時,設定第三個引數:autoAck是false還是true(false表示手動,true表示自動)。

自動確認是指,當訊息一旦被Consumer接收到,則自動確認收到,並將相應 message 從 RabbitMQ 的訊息快取中移除。

但是在實際業務處理中,很可能訊息接收到,業務處理出現異常,那麼該訊息就會丟失。如果設定了手動確認方式,則需要在業務處理成功後,呼叫ch.Ack(false),手動簽收,如果出現異常,則呼叫d.Reject(true)讓其自動重新傳送訊息。

Go 實現

安裝操作庫

安裝API庫

Go可以使用streadway/amqp庫來操作rabbit,使用以下命令來安裝:

go get github.com/streadway/amqp

封裝rabbitmq

接下來我們對streadway/amqp庫的內容進行一個二次封裝,封裝為一個rabbitmq.go檔案:

package rabbitmq
import (
	"encoding/json"
	"github.com/streadway/amqp"
	"log"
)
// RabbitMQ RabbitMQ結構
type RabbitMQ struct {
	channel  *amqp.Channel
	Name     string
	exchange string
}
// Connect 連線伺服器
func Connect(s string) *RabbitMQ {
	//連線rabbitmq
	conn, e := amqp.Dial(s)
	failOnError(e, "連線Rabbitmq伺服器失敗!")
	ch, e := conn.Channel()
	failOnError(e, "無法開啟頻道!")
	mq := new(RabbitMQ)
	mq.channel = ch
	return mq
}
// New 初始化訊息佇列
//第一個引數:rabbitmq伺服器的連結,第二個引數:佇列名字
func New(s string, name string) *RabbitMQ {
	//連線rabbitmq
	conn, e := amqp.Dial(s)
	failOnError(e, "連線Rabbitmq伺服器失敗!")
	ch, e := conn.Channel()
	failOnError(e, "無法開啟頻道!")
	q, e := ch.QueueDeclare(
		name,  //佇列名
		false, //是否開啟持久化
		true,  //不使用時刪除
		false, //排他
		false, //不等待
		nil,   //引數
	)
	failOnError(e, "初始化訊息佇列失敗!")
	mq := new(RabbitMQ)
	mq.channel = ch
	mq.Name = q.Name
	return mq
}
// QueueDeclare 宣告queue
func (q *RabbitMQ) QueueDeclare(queue string) {
	_, e := q.channel.QueueDeclare(queue, false, true, false, false, nil)
	failOnError(e, "宣告queue失敗!")
}
// QueueDelete 刪除queue
func (q *RabbitMQ) QueueDelete(queue string) {
	_, e := q.channel.QueueDelete(queue, false, true, false)
	failOnError(e, "刪除queue失敗!")
}
// Qos 設定queue引數
func (q *RabbitMQ) Qos() {
	e := q.channel.Qos(1, 0, false)
	failOnError(e, "無法設定QoS")
}
// NewExchange 初始化交換機
//第一個引數:rabbitmq伺服器的連結,第二個引數:交換機名字,第三個引數:交換機型別
func NewExchange(s string, name string, typename string) {
	//連線rabbitmq
	conn, e := amqp.Dial(s)
	failOnError(e, "連線Rabbitmq伺服器失敗!")
	ch, e := conn.Channel()
	failOnError(e, "無法開啟頻道!")
	e = ch.ExchangeDeclare(
		name,     // name
		typename, // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)
	failOnError(e, "初始化交換機失敗!")
}
// ExchangeDelete 刪除交換機
func (q *RabbitMQ) ExchangeDelete(exchange string) {
	e := q.channel.ExchangeDelete(exchange, false, true)
	failOnError(e, "刪除交換機失敗!")
}
// Bind 繫結訊息佇列到exchange
func (q *RabbitMQ) Bind(exchange string, key string) {
	e := q.channel.QueueBind(
		q.Name,
		key,
		exchange,
		false,
		nil,
	)
	failOnError(e, "繫結佇列失敗!")
	q.exchange = exchange
}
// Send 向訊息佇列傳送訊息
//Send方法可以往某個訊息佇列傳送訊息
func (q *RabbitMQ) Send(queue string, body interface{}) {
	str, e := json.Marshal(body)
	failOnError(e, "訊息序列化失敗!")
	e = q.channel.Publish(
		"",    //交換
		queue, //路由鍵
		false, //必填
		false, //立即
		amqp.Publishing{
			ReplyTo: q.Name,
			Body:    []byte(str),
		})
	msg := "向佇列:" + q.Name + "傳送訊息失敗!"
	failOnError(e, msg)
}
// Publish 向exchange傳送訊息
//Publish方法可以往某個exchange傳送訊息
func (q *RabbitMQ) Publish(exchange string, body interface{}, key string) {
	str, e := json.Marshal(body)
	failOnError(e, "訊息序列化失敗!")
	e = q.channel.Publish(
		exchange,
		key,
		false,
		false,
		amqp.Publishing{ReplyTo: q.Name,
			Body: []byte(str)},
	)
	failOnError(e, "向交換機傳送訊息失敗!")
}
// Consume 接收某個訊息佇列的訊息
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
	c, e := q.channel.Consume(
		q.Name, //指定從哪個佇列中接收訊息
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	failOnError(e, "接收訊息失敗!")
	return c
}
// Close 關閉佇列連線
func (q *RabbitMQ) Close() {
	q.channel.Close()
}
//錯誤處理常式
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

傳送端的確認

首先初始化訊息佇列的時候,我們要開啟confirm模式,才能接收到這條應答。開啟方式是將Channel.Confirm(noWait bool)引數設定為false,表示同意傳送者將當前channel通道設定為confirm模式。

func New(s string, name string) *RabbitMQ {
	conn, e := amqp.Dial(s)
	failOnError(e, "連線Rabbitmq伺服器失敗!")
	ch, e := conn.Channel()
	failOnError(e, "無法開啟頻道!")
	q, e := ch.QueueDeclare(
		name,  //佇列名
		false, //是否開啟持久化
		true,  //不使用時刪除
		false, //排他
		false, //不等待
		nil,   //引數
	)
	failOnError(e, "初始化訊息佇列失敗!")
	mq := new(RabbitMQ)
	mq.channel = ch
	mq.Name = q.Name
	// 設定為confirm模式
	mq.channel.Confirm(false)
	return mq
}

然後在封裝庫中建立一個函數handleConfirm()用於接收來自Borker的回覆:

func (q *RabbitMQ) ConfirmFromBroker(ch chan amqp.Confirmation) chan amqp.Confirmation {
	return q.channel.NotifyPublish(ch)
}

生產者

生產者端在向Broker傳送訊息的時候,我們使用一個無緩衝的通道來接收來自Broker的回覆,然後建立一個協程監聽這個無緩衝通道。

func main() {
	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")
	// 指定為topic型別
	rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange1", "fanout")
	confirm := producer.ConfirmFromBroker(make(chan amqp.Confirmation))
	go handleConfirm(confirm)
	var i int
	for {
		time.Sleep(time.Second)
		producer.Publish("exchange1", "fanout message: "+strconv.Itoa(i), "")
		i++
	}
}
func handleConfirm(confirm <-chan amqp.Confirmation) {
	for {
		select {
		case message := <-confirm:
			fmt.Println("接收到來自Broker的回覆:", message)
		}
	}
}

執行結果:

接收到來自Broker的回覆: {1 true}
接收到來自Broker的回覆: {2 true}
接收到來自Broker的回覆: {3 true}
接收到來自Broker的回覆: {4 true}
接收到來自Broker的回覆: {5 true}

消費端的確認

首先將Consume函數的第三個引數autoAck引數標記為false:

// Consume 接收某個訊息佇列的訊息
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
	c, e := q.channel.Consume(
		q.Name,
		"",
		false, // 不自動確認訊息
		false,
		false,
		false,
		nil,
	)
	failOnError(e, "接收訊息失敗!")
	return c
}

在消費者端我們採用公平派遣模式,即佇列傳送訊息給消費者的時候,不再採用輪詢機制,而是一個消費者消費完訊息之後,會呼叫Ack(false)函數向佇列傳送一個回覆,佇列每次會將訊息優先傳送給消費完訊息的消費者(回覆過)。

消費端限流:

實現公平派遣模式我們需要設定消費者端一次只能消費一條訊息,之前我們已經進行了封裝,直接在消費者端呼叫即可:

// Qos 設定queue引數
func (q *RabbitMQ) Qos() {
	e := q.channel.Qos(1, 0, false)
	failOnError(e, "無法設定QoS")
}

生產者

func main() {
	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")
	// 指定為direct型別
	rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange", "direct")
	i := 0
	for {
		time.Sleep(time.Second)
		producer.Publish("exchange", "routing message: "+strconv.Itoa(i), "key1")
		i = i + 1
	}
}

消費者1

消費者2在消費第三條訊息的時候,假設發生了錯誤,我們呼叫d.Reject(true)函數讓佇列重新傳送訊息。

func main() {
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字
	consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")
	// 指定一次只消費一條訊息,直到消費完才重新接收
	consumer1.Qos()
	// 佇列繫結到exchange
	consumer1.Bind("exchange", "key1")
	//接收訊息
	msgs := consumer1.Consume()
	go func() {
		var i int
		for d := range msgs {
			time.Sleep(time.Second * 1)
			log.Printf("Consumer1 received a message: %s", d.Body)
			// 假設消費第三條訊息的時候出現了錯誤,我們就呼叫d.Reject(true),佇列會重新傳送訊息給消費者
			if i == 2 {
				d.Reject(true)
			} else {
				// 訊息消費成功之後就回復
				d.Ack(false)
			}
			i++
		}
	}()
	select {}
}

消費者2

func main() {
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字
	consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")
	// 指定一次只消費一條訊息,直到消費完才重新接收
	consumer2.Qos()
	// 佇列繫結到exchange
	consumer2.Bind("exchange", "key1")
	//接收訊息
	msgs := consumer2.Consume()
	go func() {
		for d := range msgs {
			time.Sleep(time.Second * 5)
			log.Printf("Consumer2 received a message: %s", d.Body)
			// 訊息消費成功之後就回復
			d.Ack(false)
		}
	}()
	select {}
}

執行結果:

# 消費者1
2022/11/06 19:55:08 Consumer1 received a message: "routing message: 0"
2022/11/06 19:55:10 Consumer1 received a message: "routing message: 2"
2022/11/06 19:55:11 Consumer1 received a message: "routing message: 3"
2022/11/06 19:55:12 Consumer1 received a message: "routing message: 3"
2022/11/06 19:55:13 Consumer1 received a message: "routing message: 4"
2022/11/06 19:55:14 Consumer1 received a message: "routing message: 6"

# 消費者2
2022/11/06 19:55:13 Consumer2 received a message: "routing message: 1"

到此這篇關於RabbitMq如何做到訊息的可靠性投遞的文章就介紹到這了,更多相關RabbitMq訊息可靠性投遞內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com