首頁 > 軟體

GoLang RabbitMQ實現六種工作模式範例

2022-12-19 14:01:36

六種工作模式介紹

1.簡單(Simple)模式

P:生產者,也就是要傳送訊息的程式。

C:消費者:訊息的接收者,會一直等待訊息到來。

queue:訊息佇列,圖中紅色部分。類似一個郵箱,可以快取訊息;生產者向其中投遞訊息,消費者從其中取出訊息。

簡單模式就是單發單收,訊息的消費者監聽訊息佇列,如果佇列中有訊息,就消費掉,訊息被拿走後,自動從佇列中刪除。

2.工作佇列(Work Queue)模式

這種模式就是多個消費者消費同一個佇列中的訊息,既然消費者多了那麼就出現了訊息分配的問題,所以對應著兩種分配策略:

  • 公平分發:每個消費者接收訊息的概率是相等的,訊息佇列會迴圈依次給每個消費者傳送訊息,這種是預設的策略。
  • 公平派遣:保證消費者在消費完某個訊息,並行送確認資訊後,訊息佇列才會向它推播新的訊息,在此之間若是有新的訊息,將會被推播到其它消費者,若所有的消費者都在消費訊息,那麼就會等待。

3.釋出/訂閱(Pub/Sub)模式

在這種模型中,多了一個 Exchange 角色,而且過程略有變化:

P:生產者,也就是要傳送訊息的程式,但是不再傳送到佇列中,而是發給X (交換機)。

C:消費者,訊息的接收者,會一直等待訊息到來。

Queue:訊息佇列,接收訊息、快取訊息。

Exchange:交換機(X) ,一方面,接收生產者傳送的訊息。另一方面,如何處理訊息,遞交給某個特別佇列、遞交給所有佇列、或是將訊息丟棄。到底如何操作,取決於Exchange的型別。 Exchange有以下4種型別:

  • Fanout:廣播,將訊息交給所有繫結到交換機的佇列。
  • Direct:全值匹配,把訊息交給符合指定routing key的佇列。
  • Topic:萬用字元,與Direct型別類似,但Direct型別要求routing key完全相等,而Topic型別是對routing key進行模糊匹配,比Direct靈活。
  • Headers:根據Message的一些頭部資訊來分發過濾Message,用的比較少。

注意:Exchange負責轉發訊息,不具備儲存訊息的能力,因此如果沒有任何佇列與Exchange繫結,或者沒有符合路由規則的佇列,那麼訊息會丟失。

4.路由(Routing)模式

路由模式其實就是上述釋出/訂閱模式的交換機轉發型別變成了Direct型別。在這種模式下:

Exchange 不再把訊息交給每一個繫結的佇列,而是根據訊息的routing key進行判斷,只有佇列的

routing key與訊息的routing key完全一致,才會接收到訊息。

P:生產者,向 Exchange 傳送訊息,傳送訊息時,會指定一個routing key

X:Exchange(交換機),接收生產者的訊息,然後把訊息遞交給與routing key完全匹配的佇列。

C1:消費者,其所在佇列指定了需要routing key為error的訊息。

C2:消費者,其所在佇列指定了需要routing key為 info、error、warning 的訊息。

5.萬用字元(Tpoic)模式

路由模式其實就是上述釋出/訂閱模式的交換機轉發型別變成了Topic型別。在這種模式下:

佇列的routing key與訊息的routing key符合匹配規則,就可以接收到訊息,有兩種規則:

*:可以(只能)匹配一個單詞。

#:可以匹配多個單詞(或者零個)。

所以圖中,routing keya.orange.b的訊息就會被轉發到Q1,而routing keyLazy.a.b.c的訊息就會被傳送到Q2。

Go語言的實現

安裝操作庫

安裝API庫

Go可以使用streadway/amqp庫來操作rabbit,使用以下命令來安裝:

go get github.com/streadway/amqp

封裝rabbitmq

接下來我們對streadway/amqp庫的內容進行一個二次封裝,封裝為一個rabbitmq.go檔案:

package rabbitmq
import (
	"encoding/json"
	"github.com/streadway/amqp"
	"log"
)
// RabbitMQ RabbitMQ結構
type RabbitMQ struct {
	channel  *amqp.Channel
	Name     string
	exchange string
}
// Connect 連線伺服器
func Connect(s string) *RabbitMQ {
	//連線rabbitmq
	conn, e := amqp.Dial(s)
	failOnError(e, "連線Rabbitmq伺服器失敗!")
	ch, e := conn.Channel()
	failOnError(e, "無法開啟頻道!")
	mq := new(RabbitMQ)
	mq.channel = ch
	return mq
}
// New 初始化訊息佇列
//第一個引數:rabbitmq伺服器的連結,第二個引數:佇列名字
func New(s string, name string) *RabbitMQ {
	//連線rabbitmq
	conn, e := amqp.Dial(s)
	failOnError(e, "連線Rabbitmq伺服器失敗!")
	ch, e := conn.Channel()
	failOnError(e, "無法開啟頻道!")
	q, e := ch.QueueDeclare(
		name,  //佇列名
		false, //是否開啟持久化
		true,  //不使用時刪除
		false, //排他
		false, //不等待
		nil,   //引數
	)
	failOnError(e, "初始化訊息佇列失敗!")
	mq := new(RabbitMQ)
	mq.channel = ch
	mq.Name = q.Name
	return mq
}
// QueueDeclare 宣告queue
func (q *RabbitMQ) QueueDeclare(queue string) {
	_, e := q.channel.QueueDeclare(queue, false, true, false, false, nil)
	failOnError(e, "宣告queue失敗!")
}
// QueueDelete 刪除queue
func (q *RabbitMQ) QueueDelete(queue string) {
	_, e := q.channel.QueueDelete(queue, false, true, false)
	failOnError(e, "刪除queue失敗!")
}
// Qos 設定queue引數
func (q *RabbitMQ) Qos() {
	e := q.channel.Qos(1, 0, false)
	failOnError(e, "無法設定QoS")
}
// NewExchange 初始化交換機
//第一個引數:rabbitmq伺服器的連結,第二個引數:交換機名字,第三個引數:交換機型別
func NewExchange(s string, name string, typename string) {
	//連線rabbitmq
	conn, e := amqp.Dial(s)
	failOnError(e, "連線Rabbitmq伺服器失敗!")
	ch, e := conn.Channel()
	failOnError(e, "無法開啟頻道!")
	e = ch.ExchangeDeclare(
		name,     // name
		typename, // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)
	failOnError(e, "初始化交換機失敗!")
}
// ExchangeDelete 刪除交換機
func (q *RabbitMQ) ExchangeDelete(exchange string) {
	e := q.channel.ExchangeDelete(exchange, false, true)
	failOnError(e, "刪除交換機失敗!")
}
// Bind 繫結訊息佇列到exchange
func (q *RabbitMQ) Bind(exchange string, key string) {
	e := q.channel.QueueBind(
		q.Name,
		key,
		exchange,
		false,
		nil,
	)
	failOnError(e, "繫結佇列失敗!")
	q.exchange = exchange
}
// Send 向訊息佇列傳送訊息
//Send方法可以往某個訊息佇列傳送訊息
func (q *RabbitMQ) Send(queue string, body interface{}) {
	str, e := json.Marshal(body)
	failOnError(e, "訊息序列化失敗!")
	e = q.channel.Publish(
		"",    //交換
		queue, //路由鍵
		false, //必填
		false, //立即
		amqp.Publishing{
			ReplyTo: q.Name,
			Body:    []byte(str),
		})
	msg := "向佇列:" + q.Name + "傳送訊息失敗!"
	failOnError(e, msg)
}
// Publish 向exchange傳送訊息
//Publish方法可以往某個exchange傳送訊息
func (q *RabbitMQ) Publish(exchange string, body interface{}, key string) {
	str, e := json.Marshal(body)
	failOnError(e, "訊息序列化失敗!")
	e = q.channel.Publish(
		exchange,
		key,
		false,
		false,
		amqp.Publishing{ReplyTo: q.Name,
			Body: []byte(str)},
	)
	failOnError(e, "向交換機傳送訊息失敗!")
}
// Consume 接收某個訊息佇列的訊息
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
	c, e := q.channel.Consume(
		q.Name, //指定從哪個佇列中接收訊息
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	failOnError(e, "接收訊息失敗!")
	return c
}
// Close 關閉佇列連線
func (q *RabbitMQ) Close() {
	q.channel.Close()
}
//錯誤處理常式
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

簡單(Simple)模式

生產者

func main() {
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字
	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "simple")
	i := 0
	for {
		// 每隔2s傳送一次訊息
		time.Sleep(time.Second * 2)
		producer.Send("simple", " simple message: "+strconv.Itoa(i))
		i = i + 1
	}
}

消費者

func main() {
	consumer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "simple")
	//接收訊息時,指定
	messages := consumer.Consume()
	go func() {
		for ch := range messages {
			log.Printf("Received a message: %s", ch.Body)
			// 消費訊息要用3s
			time.Sleep(time.Second * 3)
		}
	}()
	select {}
}

執行結果:

2022/11/05 18:54:47 Received a message: " simple message: 0"
2022/11/05 18:54:52 Received a message: " simple message: 1"
2022/11/05 18:54:57 Received a message: " simple message: 2"

工作佇列(Work Queue)模式

公平分發模式:

公平分發模式採用的是輪詢機制,它會將數個任務按順序平均分發給消費者。

生產者

func main() {
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字
	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker")
	i := 0
	for {
		// 每隔2s傳送一次訊息
		time.Sleep(time.Second * 2)
		producer.Send("worker", " worker message: "+strconv.Itoa(i))
		i = i + 1
	}
}

消費者1

func main() {
	consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker")
	//接收訊息
	messages := consumer1.Consume()
	go func() {
		for ch := range messages {
			log.Printf("Received a message: %s", ch.Body)
			// 消費訊息要用3s
			time.Sleep(time.Second * 3)
		}
	}()
	select {}
}

消費者2

func main() {
	consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker")
	//接收訊息
	messages := consumer2.Consume()
	go func() {
		for ch := range messages {
			log.Printf("Received a message: %s", ch.Body)
			// 消費訊息要用3s
			time.Sleep(time.Second * 3)
		}
	}()
	select {}
}

執行結果:

# 消費者1
2022/11/05 19:45:03 Received a message: " worker message: 0"
2022/11/05 19:45:07 Received a message: " worker message: 2"
2022/11/05 19:45:11 Received a message: " worker message: 4"

# 消費者2
2022/11/05 19:45:05 Received a message: " worker message: 1"
2022/11/05 19:45:09 Received a message: " worker message: 3"
2022/11/05 19:45:13 Received a message: " worker message: 5"

可以發現,公平模式下,偶數訊息都被傳送給了消費者1,而奇數訊息都被傳送給了消費者2。

公平派遣模式:

有時候,如果訊息之間的複雜度不同,那麼不同消費者消費訊息所用的時間會不同。這個時候如果使用公平派發模式,可能會造成某一個消費者需要消費的訊息積壓過多。可以採用公平派遣模式:

公平派遣模式下傳送端與公平分發相同,消費者端只需要加一段設定程式碼,我們可以將預取計數設定為1。這告訴RabbitMQ一次不要給消費者一個以上的訊息。換句話說,在處理並確認上一條訊息之前,不要將新訊息傳送給消費者。而是將其分派給不忙的下一個消費者。

關於訊息的確認:

為了確保訊息永不丟失,RabbitMQ支援 訊息確認。消費者傳送回一個確認(acknowledgement),以告知RabbitMQ已經接收,處理了特定的訊息,並且RabbitMQ可以自由刪除它。

我們之前的程式碼中,RabbitMQ一旦向消費者傳遞了一條訊息,便立即將其標記為刪除(呼叫Consumer的第三個引數是autoAck,表示是否自動回覆)。在這種情況下,如果你終止一個消費者那麼你就可能會丟失這個任務,我們還將丟失所有已經交付給這個消費者的尚未消費的訊息。如果一個消費者意外宕機了,那麼我們希望將任務交付給其他消費者來消費者。

所以一旦向消費者傳遞了一條訊息,就不能馬上將其標記為刪除,而是要手動確認。我們需要在建立消費者的時候將autoAck引數標記為false:

// Consume 接收某個訊息佇列的訊息
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
	c, e := q.channel.Consume(
		q.Name, //指定從哪個佇列中接收訊息
		"",
		false, // 不自動確認訊息
		false,
		false,
		false,
		nil,
	)
	failOnError(e, "接收訊息失敗!")
	return c
}

然後每消費完一條訊息需要呼叫Ack(false)函數手動回覆。

生產者

func main() {
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字
	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker")
	i := 0
	for {
		// 每隔2s傳送一次訊息
		time.Sleep(time.Second * 2)
		producer.Send("worker", " worker message: "+strconv.Itoa(i))
		i = i + 1
	}
}

消費端限流:

實現公平派遣模式我們需要設定消費者端一次只能消費一條訊息,之前我們已經進行了封裝,直接在消費者端呼叫即可:

// Qos 設定queue引數
func (q *RabbitMQ) Qos() {
	e := q.channel.Qos(1, 0, false)
	failOnError(e, "無法設定QoS")
}

消費者1

func main() {
	consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker")
	// 指定一次只消費一條訊息,直到消費完才重新接收
	consumer1.Qos()
	//接收訊息
	messages := consumer1.Consume()
	go func() {
		for ch := range messages {
			log.Printf("Received a message: %s", ch.Body)
			// 消費訊息要用10s
			time.Sleep(time.Second * 10)
			// 手動回覆
			ch.Ack(false)
		}
	}()
	select {}
}

消費者2

func main() {
	consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker")
	// 指定一次只消費一條訊息,直到消費完才重新接收
	consumer2.Qos()
	//接收訊息
	messages := consumer2.Consume()
	go func() {
		for ch := range messages {
			log.Printf("Received a message: %s", ch.Body)
			// 消費訊息要用2s
			time.Sleep(time.Second * 2)
			// 手動回覆
			ch.Ack(false)
		}
	}()
	select {}
}

執行結果:

# 消費者1
2022/11/05 20:31:26 Received a message: " worker message: 0"
2022/11/05 20:31:36 Received a message: " worker message: 5"

# 消費者2
2022/11/05 20:31:28 Received a message: " worker message: 1"
2022/11/05 20:31:30 Received a message: " worker message: 2"
2022/11/05 20:31:32 Received a message: " worker message: 3"
2022/11/05 20:31:34 Received a message: " worker message: 4"
2022/11/05 20:31:38 Received a message: " worker message: 6"
2022/11/05 20:31:40 Received a message: " worker message: 7"

釋出/訂閱(Pub/Sub)模式

生產者

func main() {
	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")
	rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange1", "fanout")
	i := 0
	for {
		time.Sleep(time.Second)
		// fanout模式下不用routing key
		producer.Publish("exchange1", "pubsub message: "+strconv.Itoa(i), "")
		i = i + 1
	}
}

消費者1

func main() {
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字
	consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")
	// 佇列繫結到exchange
	consumer1.Bind("exchange1", "")
	//接收訊息
	msgs := consumer1.Consume()
	go func() {
		for d := range msgs {
			log.Printf("Consumer1 received a message: %s", d.Body)
			d.Ack(false)
		}
	}()
	select {}
}

消費者2

func main() {
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字
	consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue2")
	// 佇列繫結到exchange
	consumer2.Bind("exchange1", "")
	//接收訊息
	msgs := consumer2.Consume()
	go func() {
		for d := range msgs {
			log.Printf("Consumer2 received a message: %s", d.Body)
			d.Ack(false)
		}
	}()
	select {}
}

執行結果:

# 消費者1
2022/11/05 22:32:19 Consumer1 received a message: "pubsub message: 0"
2022/11/05 22:32:20 Consumer1 received a message: "pubsub message: 1"
2022/11/05 22:32:21 Consumer1 received a message: "pubsub message: 2"
2022/11/05 22:32:22 Consumer1 received a message: "pubsub message: 3"
2022/11/05 22:32:23 Consumer1 received a message: "pubsub message: 4"
2022/11/05 22:32:24 Consumer1 received a message: "pubsub message: 5"

# 消費者2
2022/11/05 22:32:19 Consumer2 received a message: "pubsub message: 0"
2022/11/05 22:32:20 Consumer2 received a message: "pubsub message: 1"
2022/11/05 22:32:21 Consumer2 received a message: "pubsub message: 2"
2022/11/05 22:32:22 Consumer2 received a message: "pubsub message: 3"
2022/11/05 22:32:23 Consumer2 received a message: "pubsub message: 4"
2022/11/05 22:32:24 Consumer2 received a message: "pubsub message: 5"

路由(Routing)模式

生產者

func main() {
	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")
	// 指定為direct型別
	rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange", "direct")
	i := 0
	for {
		time.Sleep(time.Second)
		// 如果是奇數,就發key1
		// 如果是偶數,就發key2
		if i%2 != 0 {
			producer.Publish("exchange", "routing message: "+strconv.Itoa(i), "key1")
		} else {
			producer.Publish("exchange", "routing message: "+strconv.Itoa(i), "key2")
		}
		i = i + 1
	}
}

消費者1

func main() {
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字
	consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")
	// 佇列繫結到exchange
	consumer1.Bind("exchange", "key1")
	//接收訊息
	msgs := consumer1.Consume()
	go func() {
		for d := range msgs {
			log.Printf("Consumer1 received a message: %s", d.Body)
			d.Ack(false)
		}
	}()
	select {}
}

消費者2

func main() {
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字
	consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue2")
	// 佇列繫結到exchange
	consumer2.Bind("exchange", "key2")
	//接收訊息
	msgs := consumer2.Consume()
	go func() {
		for d := range msgs {
			log.Printf("Consumer2 received a message: %s", d.Body)
			d.Ack(false)
		}
	}()
	select {}
}

執行結果:

# 消費者1
2022/11/05 22:51:10 Consumer1 received a message: "routing message: 1"
2022/11/05 22:51:12 Consumer1 received a message: "routing message: 3"
2022/11/05 22:51:14 Consumer1 received a message: "routing message: 5"
2022/11/05 22:51:16 Consumer1 received a message: "routing message: 7"

# 消費者2
2022/11/05 22:51:11 Consumer2 received a message: "routing message: 0"
2022/11/05 22:51:13 Consumer2 received a message: "routing message: 2"
2022/11/05 22:51:15 Consumer2 received a message: "routing message: 4"
2022/11/05 22:51:17 Consumer2 received a message: "routing message: 6"

萬用字元(Tpoic)模式

生產者

func main() {
	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")
	// 指定為topic型別
	rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange2", "topic")
	var i int
	for {
		time.Sleep(time.Second)
		if i%2 != 0 {
			producer.Publish("exchange2", "topic message: "+strconv.Itoa(i), "a.test.b.c")
		} else {
			producer.Publish("exchange2", "topic message: "+strconv.Itoa(i), "a.test.b")
		}
		i++
	}
}

消費者1

func main() {
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字
	consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")
	// 佇列繫結到exchange
	consumer1.Bind("exchange2", "*.test.*")
	//接收訊息
	msgs := consumer1.Consume()
	go func() {
		for d := range msgs {
			log.Printf("Consumer1 received a message: %s", d.Body)
			d.Ack(false)
		}
	}()
	select {}
}

消費者2

func main() {
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字
	consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue2")
	// 佇列繫結到exchange
	consumer2.Bind("exchange2", "#.test.#")
	//接收訊息
	msgs := consumer2.Consume()
	go func() {
		for d := range msgs {
			log.Printf("Consumer2 received a message: %s", d.Body)
			d.Ack(false)
		}
	}()
	select {}
}

執行結果:

# 消費者1
2022/11/05 23:09:53 Consumer1 received a message: "topic message: 0"
2022/11/05 23:09:55 Consumer1 received a message: "topic message: 2"
2022/11/05 23:09:57 Consumer1 received a message: "topic message: 4"
2022/11/05 23:09:59 Consumer1 received a message: "topic message: 6"

# 消費者2
2022/11/05 23:09:53 Consumer2 received a message: "topic message: 0"
2022/11/05 23:09:54 Consumer2 received a message: "topic message: 1"
2022/11/05 23:09:55 Consumer2 received a message: "topic message: 2"
2022/11/05 23:09:56 Consumer2 received a message: "topic message: 3"
2022/11/05 23:09:57 Consumer2 received a message: "topic message: 4"
2022/11/05 23:09:58 Consumer2 received a message: "topic message: 5"
2022/11/05 23:09:59 Consumer2 received a message: "topic message: 6"

到此這篇關於GoLang RabbitMQ實現六種工作模式範例的文章就介紹到這了,更多相關GoLang RabbitMQ工作模式內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com