首頁 > 軟體

python操作RabbitMq的三種工作模式

2022-04-12 13:00:17

一、簡介:

RabbitMq 是實現了高階訊息佇列協定(AMQP)的開源訊息代理中介軟體。訊息佇列是一種應用程式對應用程式的通行方式,應用程式通過寫訊息,將訊息傳遞於佇列,由另一應用程式讀取 完成通訊。而作為中介軟體的 RabbitMq 無疑是目前最流行的訊息佇列之一。

RabbitMq 應用場景廣泛:

  • 系統的高可用:日常生活當中各種商城秒殺,高流量,高並行的場景。當伺服器接收到如此大量請求處理業務時,有宕機的風險。某些業務可能極其複雜,但這部分不是高時效性,不需要立即反饋給使用者,我們可以將這部分處理請求拋給佇列,讓程式後置去處理,減輕伺服器在高並行場景下的壓力。
  • 分散式系統,整合系統,子系統之間的對接,以及架構設計中常常需要考慮訊息佇列的應用。

二、RabbitMq 生產和消費

生產者(producter):佇列訊息的產生者,負責生產訊息,並將訊息傳入佇列

import pika
import json
credentials = pika.PlainCredentials('shampoo', '123456')  # mq使用者名稱和密碼
# 虛擬佇列需要指定引數 virtual_host,如果是預設的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 宣告訊息佇列,訊息將在這個佇列傳遞,如不存在,則建立
result = channel.queue_declare(queue = 'python-test')
for i in range(10):
    message=json.dumps({'OrderId':"1000%s"%i})
# 向佇列插入數值 routing_key是佇列名
    channel.basic_publish(exchange = '',routing_key = 'python-test',body = message)
    print(message)
connection.close()

消費者(consumer):佇列訊息的接收者,負責 接收並處理 訊息佇列中的訊息

import pika
credentials = pika.PlainCredentials('shampoo', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
# 申明訊息佇列,訊息在這個佇列傳遞,如果不存在,則建立佇列
channel.queue_declare(queue = 'python-test', durable = False)
# 定義一個回撥函數來處理訊息佇列中的訊息,這裡是列印出來
def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag = method.delivery_tag)
    print(body.decode())
# 告訴rabbitmq,用callback來接收訊息
channel.basic_consume('python-test',callback)
# 開始接收資訊,並進入阻塞狀態,佇列裡有資訊才會呼叫callback進行處理
channel.start_consuming()

三、RabbitMq 持久化

MQ預設建立的是臨時 queue 和 exchange,如果不宣告持久化,一旦 rabbitmq 掛掉,queue、exchange 將會全部丟失。所以我們一般在建立 queue 或者 exchange 的時候會宣告 持久化。

1.queue 宣告持久化

# 宣告訊息佇列,訊息將在這個佇列傳遞,如不存在,則建立。durable = True 代表訊息佇列持久化儲存,False 非持久化儲存
result = channel.queue_declare(queue = 'python-test',durable = True)

2.exchange 宣告持久化

# 宣告exchange,由exchange指定訊息在哪個佇列傳遞,如不存在,則建立.durable = True 代表exchange持久化儲存,False 非持久化儲存
channel.exchange_declare(exchange = 'python-test', durable = True)

注意:如果已存在一個非持久化的 queue 或 exchange ,執行上述程式碼會報錯,因為當前狀態不能更改 queue 或 exchange 儲存屬性,需要刪除重建。如果 queue 和 exchange 中一個宣告了持久化,另一個沒有宣告持久化,則不允許繫結。

3.訊息持久化

雖然 exchange 和 queue 都申明瞭持久化,但如果訊息只存在記憶體裡,rabbitmq 重啟後,記憶體裡的東西還是會丟失。所以必須宣告訊息也是持久化,從記憶體轉存到硬碟。

# 向佇列插入數值 routing_key是佇列名。delivery_mode = 2 宣告訊息在佇列中持久化,delivery_mod = 1 訊息非持久化
    channel.basic_publish(exchange = '',routing_key = 'python-test',body = message,
                          properties=pika.BasicProperties(delivery_mode = 2))

4.acknowledgement 訊息不丟失

消費者(consumer)呼叫callback函數時,會存在處理訊息失敗的風險,如果處理失敗,則訊息丟失。但是也可以選擇消費者處理失敗時,將訊息回退給 rabbitmq ,重新再被消費者消費,這個時候需要設定確認標識。

channel.basic_consume(callback,queue = 'python-test',
# no_ack 設定成 False,在呼叫callback函數時,未收到確認標識,訊息會重回佇列。True,無論呼叫callback成功與否,訊息都被消費掉
                      no_ack = False)

四、RabbitMq 釋出與訂閱

rabbitmq 的釋出與訂閱要藉助交換機(Exchange)的原理實現:

Exchange 一共有三種工作模式:fanout, direct, topicd

模式一:fanout

這種模式下,傳遞到 exchange 的訊息將會轉發到所有與其繫結的 queue 上。

  • 不需要指定 routing_key ,即使指定了也是無效。
  • 需要提前將 exchange 和 queue 繫結,一個 exchange 可以繫結多個 queue,一個queue可以繫結多個exchange。
  • 需要先啟動 訂閱者,此模式下的佇列是 consumer 隨機生成的,釋出者 僅僅釋出訊息到 exchange ,由 exchange 轉發訊息至 queue。

釋出者:

import pika
import json
credentials = pika.PlainCredentials('shampoo', '123456')  # mq使用者名稱和密碼
# 虛擬佇列需要指定引數 virtual_host,如果是預設的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 宣告exchange,由exchange指定訊息在哪個佇列傳遞,如不存在,則建立。durable = True 代表exchange持久化儲存,False 非持久化儲存
channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout')
for i in range(10):
    message=json.dumps({'OrderId':"1000%s"%i})
# 向佇列插入數值 routing_key是佇列名。delivery_mode = 2 宣告訊息在佇列中持久化,delivery_mod = 1 訊息非持久化。routing_key 不需要設定
    channel.basic_publish(exchange = 'python-test',routing_key = '',body = message,
                          properties=pika.BasicProperties(delivery_mode = 2))
    print(message)
connection.close()

訂閱者:

import pika
credentials = pika.PlainCredentials('shampoo', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
# 建立臨時佇列,佇列名傳空字元,consumer關閉後,佇列自動刪除
result = channel.queue_declare('',exclusive=True)
# 宣告exchange,由exchange指定訊息在哪個佇列傳遞,如不存在,則建立。durable = True 代表exchange持久化儲存,False 非持久化儲存
channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout')
# 繫結exchange和佇列  exchange 使我們能夠確切地指定訊息應該到哪個佇列去
channel.queue_bind(exchange = 'python-test',queue = result.method.queue)
# 定義一個回撥函數來處理訊息佇列中的訊息,這裡是列印出來
def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag = method.delivery_tag)
    print(body.decode())
channel.basic_consume(result.method.queue,callback,# 設定成 False,在呼叫callback函數時,未收到確認標識,訊息會重回佇列。True,無論呼叫callback成功與否,訊息都被消費掉
                      auto_ack = False)
channel.start_consuming()

模式二:direct

這種工作模式的原理是 訊息傳送至 exchange,exchange 根據 路由鍵(routing_key)轉發到相對應的 queue 上。

  • 可以使用預設 exchange =' ' ,也可以自定義 exchange
  • 這種模式下不需要將 exchange 和 任何進行繫結,當然繫結也是可以的。可以將 exchange 和 queue ,routing_key 和 queue 進行繫結
  • 傳遞或接受訊息時 需要 指定 routing_key
  • 需要先啟動 訂閱者,此模式下的佇列是 consumer 隨機生成的,釋出者 僅僅釋出訊息到 exchange ,由 exchange 轉發訊息至 queue。

釋出者:

import pika
import json
credentials = pika.PlainCredentials('shampoo', '123456')  # mq使用者名稱和密碼
# 虛擬佇列需要指定引數 virtual_host,如果是預設的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 宣告exchange,由exchange指定訊息在哪個佇列傳遞,如不存在,則建立。durable = True 代表exchange持久化儲存,False 非持久化儲存
channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct')
for i in range(10):
    message=json.dumps({'OrderId':"1000%s"%i})
# 指定 routing_key。delivery_mode = 2 宣告訊息在佇列中持久化,delivery_mod = 1 訊息非持久化
    channel.basic_publish(exchange = 'python-test',routing_key = 'OrderId',body = message,
                          properties=pika.BasicProperties(delivery_mode = 2))
    print(message)
connection.close()

消費者:

import pika
credentials = pika.PlainCredentials('shampoo', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
# 建立臨時佇列,佇列名傳空字元,consumer關閉後,佇列自動刪除
result = channel.queue_declare('',exclusive=True)
# 宣告exchange,由exchange指定訊息在哪個佇列傳遞,如不存在,則建立。durable = True 代表exchange持久化儲存,False 非持久化儲存
channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct')
# 繫結exchange和佇列  exchange 使我們能夠確切地指定訊息應該到哪個佇列去
channel.queue_bind(exchange = 'python-test',queue = result.method.queue,routing_key='OrderId')
# 定義一個回撥函數來處理訊息佇列中的訊息,這裡是列印出來
def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag = method.delivery_tag)
    print(body.decode())
#channel.basic_qos(prefetch_count=1)
# 告訴rabbitmq,用callback來接受訊息
channel.basic_consume(result.method.queue,callback,
# 設定成 False,在呼叫callback函數時,未收到確認標識,訊息會重回佇列。True,無論呼叫callback成功與否,訊息都被消費掉
                      auto_ack = False)
channel.start_consuming()

模式三:topicd

這種模式和第二種模式差不多,exchange 也是通過 路由鍵 routing_key 來轉發訊息到指定的 queue 。 不同點是 routing_key 使用正規表示式支援模糊匹配,但匹配規則又與常規的正規表示式不同,比如“#”是匹配全部,“*”是匹配一個詞。

舉例:routing_key =“#orderid#”,意思是將訊息轉發至所有 routing_key 包含 “orderid” 字元的佇列中。程式碼和模式二 類似,就不貼出來了。

以上就是python操作RabbitMq的三種工作模式的詳細內容,更多關於python操作RabbitMq工作模式的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com