<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
在使用RabbitMQ的時候,我們需要保證訊息不能丟失,訊息從生產者生產出來一直到消費者消費成功,這條鏈路是這樣的:
訊息的可靠投遞分為了兩大內容:傳送端的確認(p->broker和exchange->queue)和消費端的確認(queue->c)。
傳送端的確認
Rabbit提供了兩種方式來保證傳送端的訊息可靠性投遞:confirm 確認模式
和return 退回模式。
confirm 確認模式:訊息從 producer 到達 exchange 則會給 producer 傳送一個應答,我們需要開啟confirm模式,才能接收到這條應答。開啟方式是將Channel.Confirm(noWait bool)
引數設定為false
,表示同意傳送者將當前channel通道設定為confirm模式。
return 退回模式:訊息從 exchange–>queue 投遞失敗,會將訊息退回給producer。
消費端的確認
訊息從Queue傳送到消費端之後,消費端會傳送一個確認訊息:Consumer Ack
,有兩種確認方式:自動確認和手動確認。
在編碼中,關於訊息的確認方式,我們需要在消費者端呼叫Consumer
函數時,設定第三個引數:autoAck
是false還是true(false表示手動,true表示自動)。
自動確認是指,當訊息一旦被Consumer接收到,則自動確認收到,並將相應 message 從 RabbitMQ 的訊息快取中移除。
但是在實際業務處理中,很可能訊息接收到,業務處理出現異常,那麼該訊息就會丟失。如果設定了手動確認方式,則需要在業務處理成功後,呼叫ch.Ack(false)
,手動簽收,如果出現異常,則呼叫d.Reject(true)
讓其自動重新傳送訊息。
安裝API庫
Go可以使用streadway/amqp
庫來操作rabbit,使用以下命令來安裝:
go get github.com/streadway/amqp
封裝rabbitmq
接下來我們對streadway/amqp
庫的內容進行一個二次封裝,封裝為一個rabbitmq.go
檔案:
package rabbitmq import ( "encoding/json" "github.com/streadway/amqp" "log" ) // RabbitMQ RabbitMQ結構 type RabbitMQ struct { channel *amqp.Channel Name string exchange string } // Connect 連線伺服器 func Connect(s string) *RabbitMQ { //連線rabbitmq conn, e := amqp.Dial(s) failOnError(e, "連線Rabbitmq伺服器失敗!") ch, e := conn.Channel() failOnError(e, "無法開啟頻道!") mq := new(RabbitMQ) mq.channel = ch return mq } // New 初始化訊息佇列 //第一個引數:rabbitmq伺服器的連結,第二個引數:佇列名字 func New(s string, name string) *RabbitMQ { //連線rabbitmq conn, e := amqp.Dial(s) failOnError(e, "連線Rabbitmq伺服器失敗!") ch, e := conn.Channel() failOnError(e, "無法開啟頻道!") q, e := ch.QueueDeclare( name, //佇列名 false, //是否開啟持久化 true, //不使用時刪除 false, //排他 false, //不等待 nil, //引數 ) failOnError(e, "初始化訊息佇列失敗!") mq := new(RabbitMQ) mq.channel = ch mq.Name = q.Name return mq } // QueueDeclare 宣告queue func (q *RabbitMQ) QueueDeclare(queue string) { _, e := q.channel.QueueDeclare(queue, false, true, false, false, nil) failOnError(e, "宣告queue失敗!") } // QueueDelete 刪除queue func (q *RabbitMQ) QueueDelete(queue string) { _, e := q.channel.QueueDelete(queue, false, true, false) failOnError(e, "刪除queue失敗!") } // Qos 設定queue引數 func (q *RabbitMQ) Qos() { e := q.channel.Qos(1, 0, false) failOnError(e, "無法設定QoS") } // NewExchange 初始化交換機 //第一個引數:rabbitmq伺服器的連結,第二個引數:交換機名字,第三個引數:交換機型別 func NewExchange(s string, name string, typename string) { //連線rabbitmq conn, e := amqp.Dial(s) failOnError(e, "連線Rabbitmq伺服器失敗!") ch, e := conn.Channel() failOnError(e, "無法開啟頻道!") e = ch.ExchangeDeclare( name, // name typename, // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(e, "初始化交換機失敗!") } // ExchangeDelete 刪除交換機 func (q *RabbitMQ) ExchangeDelete(exchange string) { e := q.channel.ExchangeDelete(exchange, false, true) failOnError(e, "刪除交換機失敗!") } // Bind 繫結訊息佇列到exchange func (q *RabbitMQ) Bind(exchange string, key string) { e := q.channel.QueueBind( q.Name, key, exchange, false, nil, ) failOnError(e, "繫結佇列失敗!") q.exchange = exchange } // Send 向訊息佇列傳送訊息 //Send方法可以往某個訊息佇列傳送訊息 func (q *RabbitMQ) Send(queue string, body interface{}) { str, e := json.Marshal(body) failOnError(e, "訊息序列化失敗!") e = q.channel.Publish( "", //交換 queue, //路由鍵 false, //必填 false, //立即 amqp.Publishing{ ReplyTo: q.Name, Body: []byte(str), }) msg := "向佇列:" + q.Name + "傳送訊息失敗!" failOnError(e, msg) } // Publish 向exchange傳送訊息 //Publish方法可以往某個exchange傳送訊息 func (q *RabbitMQ) Publish(exchange string, body interface{}, key string) { str, e := json.Marshal(body) failOnError(e, "訊息序列化失敗!") e = q.channel.Publish( exchange, key, false, false, amqp.Publishing{ReplyTo: q.Name, Body: []byte(str)}, ) failOnError(e, "向交換機傳送訊息失敗!") } // Consume 接收某個訊息佇列的訊息 func (q *RabbitMQ) Consume() <-chan amqp.Delivery { c, e := q.channel.Consume( q.Name, //指定從哪個佇列中接收訊息 "", true, false, false, false, nil, ) failOnError(e, "接收訊息失敗!") return c } // Close 關閉佇列連線 func (q *RabbitMQ) Close() { q.channel.Close() } //錯誤處理常式 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }
首先初始化訊息佇列的時候,我們要開啟confirm模式
,才能接收到這條應答。開啟方式是將Channel.Confirm(noWait bool)
引數設定為false
,表示同意傳送者將當前channel通道設定為confirm模式。
func New(s string, name string) *RabbitMQ { conn, e := amqp.Dial(s) failOnError(e, "連線Rabbitmq伺服器失敗!") ch, e := conn.Channel() failOnError(e, "無法開啟頻道!") q, e := ch.QueueDeclare( name, //佇列名 false, //是否開啟持久化 true, //不使用時刪除 false, //排他 false, //不等待 nil, //引數 ) failOnError(e, "初始化訊息佇列失敗!") mq := new(RabbitMQ) mq.channel = ch mq.Name = q.Name // 設定為confirm模式 mq.channel.Confirm(false) return mq }
然後在封裝庫中建立一個函數handleConfirm()
用於接收來自Borker的回覆:
func (q *RabbitMQ) ConfirmFromBroker(ch chan amqp.Confirmation) chan amqp.Confirmation { return q.channel.NotifyPublish(ch) }
生產者
生產者端在向Broker傳送訊息的時候,我們使用一個無緩衝的通道來接收來自Broker的回覆,然後建立一個協程監聽這個無緩衝通道。
func main() { producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue") // 指定為topic型別 rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange1", "fanout") confirm := producer.ConfirmFromBroker(make(chan amqp.Confirmation)) go handleConfirm(confirm) var i int for { time.Sleep(time.Second) producer.Publish("exchange1", "fanout message: "+strconv.Itoa(i), "") i++ } } func handleConfirm(confirm <-chan amqp.Confirmation) { for { select { case message := <-confirm: fmt.Println("接收到來自Broker的回覆:", message) } } }
執行結果:
接收到來自Broker的回覆: {1 true}
接收到來自Broker的回覆: {2 true}
接收到來自Broker的回覆: {3 true}
接收到來自Broker的回覆: {4 true}
接收到來自Broker的回覆: {5 true}
首先將Consume
函數的第三個引數autoAck
引數標記為false:
// Consume 接收某個訊息佇列的訊息 func (q *RabbitMQ) Consume() <-chan amqp.Delivery { c, e := q.channel.Consume( q.Name, "", false, // 不自動確認訊息 false, false, false, nil, ) failOnError(e, "接收訊息失敗!") return c }
在消費者端我們採用公平派遣模式,即佇列傳送訊息給消費者的時候,不再採用輪詢機制,而是一個消費者消費完訊息之後,會呼叫Ack(false)
函數向佇列傳送一個回覆,佇列每次會將訊息優先傳送給消費完訊息的消費者(回覆過)。
消費端限流:
實現公平派遣模式我們需要設定消費者端一次只能消費一條訊息,之前我們已經進行了封裝,直接在消費者端呼叫即可:
// Qos 設定queue引數 func (q *RabbitMQ) Qos() { e := q.channel.Qos(1, 0, false) failOnError(e, "無法設定QoS") }
生產者
func main() { producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue") // 指定為direct型別 rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange", "direct") i := 0 for { time.Sleep(time.Second) producer.Publish("exchange", "routing message: "+strconv.Itoa(i), "key1") i = i + 1 } }
消費者1
消費者2在消費第三條訊息的時候,假設發生了錯誤,我們呼叫d.Reject(true)
函數讓佇列重新傳送訊息。
func main() { //第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字 consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1") // 指定一次只消費一條訊息,直到消費完才重新接收 consumer1.Qos() // 佇列繫結到exchange consumer1.Bind("exchange", "key1") //接收訊息 msgs := consumer1.Consume() go func() { var i int for d := range msgs { time.Sleep(time.Second * 1) log.Printf("Consumer1 received a message: %s", d.Body) // 假設消費第三條訊息的時候出現了錯誤,我們就呼叫d.Reject(true),佇列會重新傳送訊息給消費者 if i == 2 { d.Reject(true) } else { // 訊息消費成功之後就回復 d.Ack(false) } i++ } }() select {} }
消費者2
func main() { //第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字 consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1") // 指定一次只消費一條訊息,直到消費完才重新接收 consumer2.Qos() // 佇列繫結到exchange consumer2.Bind("exchange", "key1") //接收訊息 msgs := consumer2.Consume() go func() { for d := range msgs { time.Sleep(time.Second * 5) log.Printf("Consumer2 received a message: %s", d.Body) // 訊息消費成功之後就回復 d.Ack(false) } }() select {} }
執行結果:
# 消費者1
2022/11/06 19:55:08 Consumer1 received a message: "routing message: 0"
2022/11/06 19:55:10 Consumer1 received a message: "routing message: 2"
2022/11/06 19:55:11 Consumer1 received a message: "routing message: 3"
2022/11/06 19:55:12 Consumer1 received a message: "routing message: 3"
2022/11/06 19:55:13 Consumer1 received a message: "routing message: 4"
2022/11/06 19:55:14 Consumer1 received a message: "routing message: 6"# 消費者2
2022/11/06 19:55:13 Consumer2 received a message: "routing message: 1"
到此這篇關於RabbitMq如何做到訊息的可靠性投遞的文章就介紹到這了,更多相關RabbitMq訊息可靠性投遞內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45