<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
P:生產者,也就是要傳送訊息的程式。
C:消費者:訊息的接收者,會一直等待訊息到來。
queue:訊息佇列,圖中紅色部分。類似一個郵箱,可以快取訊息;生產者向其中投遞訊息,消費者從其中取出訊息。
簡單模式就是單發單收,訊息的消費者監聽訊息佇列,如果佇列中有訊息,就消費掉,訊息被拿走後,自動從佇列中刪除。
這種模式就是多個消費者消費同一個佇列中的訊息,既然消費者多了那麼就出現了訊息分配的問題,所以對應著兩種分配策略:
在這種模型中,多了一個 Exchange 角色,而且過程略有變化:
P:生產者,也就是要傳送訊息的程式,但是不再傳送到佇列中,而是發給X (交換機)。
C:消費者,訊息的接收者,會一直等待訊息到來。
Queue:訊息佇列,接收訊息、快取訊息。
Exchange:交換機(X) ,一方面,接收生產者傳送的訊息。另一方面,如何處理訊息,遞交給某個特別佇列、遞交給所有佇列、或是將訊息丟棄。到底如何操作,取決於Exchange的型別。 Exchange有以下4種型別:
routing key
的佇列。routing key
完全相等,而Topic型別是對routing key
進行模糊匹配,比Direct靈活。注意:Exchange負責轉發訊息,不具備儲存訊息的能力,因此如果沒有任何佇列與Exchange繫結,或者沒有符合路由規則的佇列,那麼訊息會丟失。
路由模式其實就是上述釋出/訂閱模式的交換機轉發型別變成了Direct型別。在這種模式下:
Exchange 不再把訊息交給每一個繫結的佇列,而是根據訊息的routing key
進行判斷,只有佇列的
routing key
與訊息的routing key
完全一致,才會接收到訊息。
P:生產者,向 Exchange 傳送訊息,傳送訊息時,會指定一個routing key
。
X:Exchange(交換機),接收生產者的訊息,然後把訊息遞交給與routing key
完全匹配的佇列。
C1:消費者,其所在佇列指定了需要routing key
為error的訊息。
C2:消費者,其所在佇列指定了需要routing key
為 info、error、warning 的訊息。
路由模式其實就是上述釋出/訂閱模式的交換機轉發型別變成了Topic型別。在這種模式下:
佇列的routing key
與訊息的routing key
符合匹配規則,就可以接收到訊息,有兩種規則:
*
:可以(只能)匹配一個單詞。
#
:可以匹配多個單詞(或者零個)。
所以圖中,routing key
為a.orange.b
的訊息就會被轉發到Q1,而routing key
為Lazy.a.b.c
的訊息就會被傳送到Q2。
安裝API庫
Go可以使用streadway/amqp
庫來操作rabbit,使用以下命令來安裝:
go get github.com/streadway/amqp
封裝rabbitmq
接下來我們對streadway/amqp
庫的內容進行一個二次封裝,封裝為一個rabbitmq.go
檔案:
package rabbitmq import ( "encoding/json" "github.com/streadway/amqp" "log" ) // RabbitMQ RabbitMQ結構 type RabbitMQ struct { channel *amqp.Channel Name string exchange string } // Connect 連線伺服器 func Connect(s string) *RabbitMQ { //連線rabbitmq conn, e := amqp.Dial(s) failOnError(e, "連線Rabbitmq伺服器失敗!") ch, e := conn.Channel() failOnError(e, "無法開啟頻道!") mq := new(RabbitMQ) mq.channel = ch return mq } // New 初始化訊息佇列 //第一個引數:rabbitmq伺服器的連結,第二個引數:佇列名字 func New(s string, name string) *RabbitMQ { //連線rabbitmq conn, e := amqp.Dial(s) failOnError(e, "連線Rabbitmq伺服器失敗!") ch, e := conn.Channel() failOnError(e, "無法開啟頻道!") q, e := ch.QueueDeclare( name, //佇列名 false, //是否開啟持久化 true, //不使用時刪除 false, //排他 false, //不等待 nil, //引數 ) failOnError(e, "初始化訊息佇列失敗!") mq := new(RabbitMQ) mq.channel = ch mq.Name = q.Name return mq } // QueueDeclare 宣告queue func (q *RabbitMQ) QueueDeclare(queue string) { _, e := q.channel.QueueDeclare(queue, false, true, false, false, nil) failOnError(e, "宣告queue失敗!") } // QueueDelete 刪除queue func (q *RabbitMQ) QueueDelete(queue string) { _, e := q.channel.QueueDelete(queue, false, true, false) failOnError(e, "刪除queue失敗!") } // Qos 設定queue引數 func (q *RabbitMQ) Qos() { e := q.channel.Qos(1, 0, false) failOnError(e, "無法設定QoS") } // NewExchange 初始化交換機 //第一個引數:rabbitmq伺服器的連結,第二個引數:交換機名字,第三個引數:交換機型別 func NewExchange(s string, name string, typename string) { //連線rabbitmq conn, e := amqp.Dial(s) failOnError(e, "連線Rabbitmq伺服器失敗!") ch, e := conn.Channel() failOnError(e, "無法開啟頻道!") e = ch.ExchangeDeclare( name, // name typename, // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(e, "初始化交換機失敗!") } // ExchangeDelete 刪除交換機 func (q *RabbitMQ) ExchangeDelete(exchange string) { e := q.channel.ExchangeDelete(exchange, false, true) failOnError(e, "刪除交換機失敗!") } // Bind 繫結訊息佇列到exchange func (q *RabbitMQ) Bind(exchange string, key string) { e := q.channel.QueueBind( q.Name, key, exchange, false, nil, ) failOnError(e, "繫結佇列失敗!") q.exchange = exchange } // Send 向訊息佇列傳送訊息 //Send方法可以往某個訊息佇列傳送訊息 func (q *RabbitMQ) Send(queue string, body interface{}) { str, e := json.Marshal(body) failOnError(e, "訊息序列化失敗!") e = q.channel.Publish( "", //交換 queue, //路由鍵 false, //必填 false, //立即 amqp.Publishing{ ReplyTo: q.Name, Body: []byte(str), }) msg := "向佇列:" + q.Name + "傳送訊息失敗!" failOnError(e, msg) } // Publish 向exchange傳送訊息 //Publish方法可以往某個exchange傳送訊息 func (q *RabbitMQ) Publish(exchange string, body interface{}, key string) { str, e := json.Marshal(body) failOnError(e, "訊息序列化失敗!") e = q.channel.Publish( exchange, key, false, false, amqp.Publishing{ReplyTo: q.Name, Body: []byte(str)}, ) failOnError(e, "向交換機傳送訊息失敗!") } // Consume 接收某個訊息佇列的訊息 func (q *RabbitMQ) Consume() <-chan amqp.Delivery { c, e := q.channel.Consume( q.Name, //指定從哪個佇列中接收訊息 "", true, false, false, false, nil, ) failOnError(e, "接收訊息失敗!") return c } // Close 關閉佇列連線 func (q *RabbitMQ) Close() { q.channel.Close() } //錯誤處理常式 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }
生產者
func main() { //第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字 producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "simple") i := 0 for { // 每隔2s傳送一次訊息 time.Sleep(time.Second * 2) producer.Send("simple", " simple message: "+strconv.Itoa(i)) i = i + 1 } }
消費者
func main() { consumer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "simple") //接收訊息時,指定 messages := consumer.Consume() go func() { for ch := range messages { log.Printf("Received a message: %s", ch.Body) // 消費訊息要用3s time.Sleep(time.Second * 3) } }() select {} }
執行結果:
2022/11/05 18:54:47 Received a message: " simple message: 0"
2022/11/05 18:54:52 Received a message: " simple message: 1"
2022/11/05 18:54:57 Received a message: " simple message: 2"
公平分發模式:
公平分發模式採用的是輪詢機制,它會將數個任務按順序平均分發給消費者。
生產者
func main() { //第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字 producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker") i := 0 for { // 每隔2s傳送一次訊息 time.Sleep(time.Second * 2) producer.Send("worker", " worker message: "+strconv.Itoa(i)) i = i + 1 } }
消費者1
func main() { consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker") //接收訊息 messages := consumer1.Consume() go func() { for ch := range messages { log.Printf("Received a message: %s", ch.Body) // 消費訊息要用3s time.Sleep(time.Second * 3) } }() select {} }
消費者2
func main() { consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker") //接收訊息 messages := consumer2.Consume() go func() { for ch := range messages { log.Printf("Received a message: %s", ch.Body) // 消費訊息要用3s time.Sleep(time.Second * 3) } }() select {} }
執行結果:
# 消費者1
2022/11/05 19:45:03 Received a message: " worker message: 0"
2022/11/05 19:45:07 Received a message: " worker message: 2"
2022/11/05 19:45:11 Received a message: " worker message: 4"# 消費者2
2022/11/05 19:45:05 Received a message: " worker message: 1"
2022/11/05 19:45:09 Received a message: " worker message: 3"
2022/11/05 19:45:13 Received a message: " worker message: 5"
可以發現,公平模式下,偶數訊息都被傳送給了消費者1,而奇數訊息都被傳送給了消費者2。
公平派遣模式:
有時候,如果訊息之間的複雜度不同,那麼不同消費者消費訊息所用的時間會不同。這個時候如果使用公平派發模式,可能會造成某一個消費者需要消費的訊息積壓過多。可以採用公平派遣模式:
公平派遣模式下傳送端與公平分發相同,消費者端只需要加一段設定程式碼,我們可以將預取計數設定為1。這告訴RabbitMQ一次不要給消費者一個以上的訊息。換句話說,在處理並確認上一條訊息之前,不要將新訊息傳送給消費者。而是將其分派給不忙的下一個消費者。
關於訊息的確認:
為了確保訊息永不丟失,RabbitMQ支援 訊息確認。消費者傳送回一個確認(acknowledgement),以告知RabbitMQ已經接收,處理了特定的訊息,並且RabbitMQ可以自由刪除它。
我們之前的程式碼中,RabbitMQ一旦向消費者傳遞了一條訊息,便立即將其標記為刪除(呼叫Consumer的第三個引數是autoAck,表示是否自動回覆)。在這種情況下,如果你終止一個消費者那麼你就可能會丟失這個任務,我們還將丟失所有已經交付給這個消費者的尚未消費的訊息。如果一個消費者意外宕機了,那麼我們希望將任務交付給其他消費者來消費者。
所以一旦向消費者傳遞了一條訊息,就不能馬上將其標記為刪除,而是要手動確認。我們需要在建立消費者的時候將autoAck
引數標記為false:
// Consume 接收某個訊息佇列的訊息 func (q *RabbitMQ) Consume() <-chan amqp.Delivery { c, e := q.channel.Consume( q.Name, //指定從哪個佇列中接收訊息 "", false, // 不自動確認訊息 false, false, false, nil, ) failOnError(e, "接收訊息失敗!") return c }
然後每消費完一條訊息需要呼叫Ack(false)
函數手動回覆。
生產者
func main() { //第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字 producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker") i := 0 for { // 每隔2s傳送一次訊息 time.Sleep(time.Second * 2) producer.Send("worker", " worker message: "+strconv.Itoa(i)) i = i + 1 } }
消費端限流:
實現公平派遣模式我們需要設定消費者端一次只能消費一條訊息,之前我們已經進行了封裝,直接在消費者端呼叫即可:
// Qos 設定queue引數 func (q *RabbitMQ) Qos() { e := q.channel.Qos(1, 0, false) failOnError(e, "無法設定QoS") }
消費者1
func main() { consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker") // 指定一次只消費一條訊息,直到消費完才重新接收 consumer1.Qos() //接收訊息 messages := consumer1.Consume() go func() { for ch := range messages { log.Printf("Received a message: %s", ch.Body) // 消費訊息要用10s time.Sleep(time.Second * 10) // 手動回覆 ch.Ack(false) } }() select {} }
消費者2
func main() { consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker") // 指定一次只消費一條訊息,直到消費完才重新接收 consumer2.Qos() //接收訊息 messages := consumer2.Consume() go func() { for ch := range messages { log.Printf("Received a message: %s", ch.Body) // 消費訊息要用2s time.Sleep(time.Second * 2) // 手動回覆 ch.Ack(false) } }() select {} }
執行結果:
# 消費者1
2022/11/05 20:31:26 Received a message: " worker message: 0"
2022/11/05 20:31:36 Received a message: " worker message: 5"# 消費者2
2022/11/05 20:31:28 Received a message: " worker message: 1"
2022/11/05 20:31:30 Received a message: " worker message: 2"
2022/11/05 20:31:32 Received a message: " worker message: 3"
2022/11/05 20:31:34 Received a message: " worker message: 4"
2022/11/05 20:31:38 Received a message: " worker message: 6"
2022/11/05 20:31:40 Received a message: " worker message: 7"
生產者
func main() { producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue") rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange1", "fanout") i := 0 for { time.Sleep(time.Second) // fanout模式下不用routing key producer.Publish("exchange1", "pubsub message: "+strconv.Itoa(i), "") i = i + 1 } }
消費者1
func main() { //第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字 consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1") // 佇列繫結到exchange consumer1.Bind("exchange1", "") //接收訊息 msgs := consumer1.Consume() go func() { for d := range msgs { log.Printf("Consumer1 received a message: %s", d.Body) d.Ack(false) } }() select {} }
消費者2
func main() { //第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字 consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue2") // 佇列繫結到exchange consumer2.Bind("exchange1", "") //接收訊息 msgs := consumer2.Consume() go func() { for d := range msgs { log.Printf("Consumer2 received a message: %s", d.Body) d.Ack(false) } }() select {} }
執行結果:
# 消費者1
2022/11/05 22:32:19 Consumer1 received a message: "pubsub message: 0"
2022/11/05 22:32:20 Consumer1 received a message: "pubsub message: 1"
2022/11/05 22:32:21 Consumer1 received a message: "pubsub message: 2"
2022/11/05 22:32:22 Consumer1 received a message: "pubsub message: 3"
2022/11/05 22:32:23 Consumer1 received a message: "pubsub message: 4"
2022/11/05 22:32:24 Consumer1 received a message: "pubsub message: 5"# 消費者2
2022/11/05 22:32:19 Consumer2 received a message: "pubsub message: 0"
2022/11/05 22:32:20 Consumer2 received a message: "pubsub message: 1"
2022/11/05 22:32:21 Consumer2 received a message: "pubsub message: 2"
2022/11/05 22:32:22 Consumer2 received a message: "pubsub message: 3"
2022/11/05 22:32:23 Consumer2 received a message: "pubsub message: 4"
2022/11/05 22:32:24 Consumer2 received a message: "pubsub message: 5"
生產者
func main() { producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue") // 指定為direct型別 rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange", "direct") i := 0 for { time.Sleep(time.Second) // 如果是奇數,就發key1 // 如果是偶數,就發key2 if i%2 != 0 { producer.Publish("exchange", "routing message: "+strconv.Itoa(i), "key1") } else { producer.Publish("exchange", "routing message: "+strconv.Itoa(i), "key2") } i = i + 1 } }
消費者1
func main() { //第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字 consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1") // 佇列繫結到exchange consumer1.Bind("exchange", "key1") //接收訊息 msgs := consumer1.Consume() go func() { for d := range msgs { log.Printf("Consumer1 received a message: %s", d.Body) d.Ack(false) } }() select {} }
消費者2
func main() { //第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字 consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue2") // 佇列繫結到exchange consumer2.Bind("exchange", "key2") //接收訊息 msgs := consumer2.Consume() go func() { for d := range msgs { log.Printf("Consumer2 received a message: %s", d.Body) d.Ack(false) } }() select {} }
執行結果:
# 消費者1
2022/11/05 22:51:10 Consumer1 received a message: "routing message: 1"
2022/11/05 22:51:12 Consumer1 received a message: "routing message: 3"
2022/11/05 22:51:14 Consumer1 received a message: "routing message: 5"
2022/11/05 22:51:16 Consumer1 received a message: "routing message: 7"# 消費者2
2022/11/05 22:51:11 Consumer2 received a message: "routing message: 0"
2022/11/05 22:51:13 Consumer2 received a message: "routing message: 2"
2022/11/05 22:51:15 Consumer2 received a message: "routing message: 4"
2022/11/05 22:51:17 Consumer2 received a message: "routing message: 6"
生產者
func main() { producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue") // 指定為topic型別 rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange2", "topic") var i int for { time.Sleep(time.Second) if i%2 != 0 { producer.Publish("exchange2", "topic message: "+strconv.Itoa(i), "a.test.b.c") } else { producer.Publish("exchange2", "topic message: "+strconv.Itoa(i), "a.test.b") } i++ } }
消費者1
func main() { //第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字 consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1") // 佇列繫結到exchange consumer1.Bind("exchange2", "*.test.*") //接收訊息 msgs := consumer1.Consume() go func() { for d := range msgs { log.Printf("Consumer1 received a message: %s", d.Body) d.Ack(false) } }() select {} }
消費者2
func main() { //第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字 consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue2") // 佇列繫結到exchange consumer2.Bind("exchange2", "#.test.#") //接收訊息 msgs := consumer2.Consume() go func() { for d := range msgs { log.Printf("Consumer2 received a message: %s", d.Body) d.Ack(false) } }() select {} }
執行結果:
# 消費者1
2022/11/05 23:09:53 Consumer1 received a message: "topic message: 0"
2022/11/05 23:09:55 Consumer1 received a message: "topic message: 2"
2022/11/05 23:09:57 Consumer1 received a message: "topic message: 4"
2022/11/05 23:09:59 Consumer1 received a message: "topic message: 6"# 消費者2
2022/11/05 23:09:53 Consumer2 received a message: "topic message: 0"
2022/11/05 23:09:54 Consumer2 received a message: "topic message: 1"
2022/11/05 23:09:55 Consumer2 received a message: "topic message: 2"
2022/11/05 23:09:56 Consumer2 received a message: "topic message: 3"
2022/11/05 23:09:57 Consumer2 received a message: "topic message: 4"
2022/11/05 23:09:58 Consumer2 received a message: "topic message: 5"
2022/11/05 23:09:59 Consumer2 received a message: "topic message: 6"
到此這篇關於GoLang RabbitMQ實現六種工作模式範例的文章就介紹到這了,更多相關GoLang RabbitMQ工作模式內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45