首頁 > 軟體

python對RabbitMQ的簡單入門使用教學

2022-06-27 10:00:40

(一)RabbitMQ的簡介

RabbitMq 是實現了高階訊息佇列協定(AMQP)的開源訊息代理中介軟體。訊息佇列是一種應用程式對應用程式的通行方式,應用程式通過寫訊息,將訊息傳遞於佇列,由另一應用程式讀取 完成通訊。而作為中介軟體的 RabbitMq 無疑是目前最流行的訊息佇列之一。目前使用較多的訊息佇列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。

RabbitMQ總體架構

PS:生產者和消費者可能在不同的程式或主機中,當然也有可能一個程式有可能既是生產者,也是消費者。

RabbitMq 應用場景廣泛:

1.系統的高可用:日常生活當中各種商城秒殺,高流量,高並行的場景。當伺服器接收到如此大量請求處理業務時,有宕機的風險。某些業務可能極其複雜,但這部分不是高時效性,不需要立即反饋給使用者,我們可以將這部分處理請求拋給佇列,讓程式後置去處理,減輕伺服器在高並行場景下的壓力。

2.分散式系統,整合系統,子系統之間的對接,以及架構設計中常常需要考慮訊息佇列的應用。

(二)RabbitMQ的安裝

apt-get update
apt-get install erlang
apt-get install rabbitmq-server


#啟動rabbitmq: service rabbitmq-server start
#停止rabbitmq: service rabbitmq-server stop
#重啟rabbitmq: service rabbitmq-server restart

#啟動rabbitmq外掛:rabbitmq-plugins enable rabbitmq_management

啟用rabbitmq_management外掛後就可以登入後臺管理頁面了,瀏覽器輸入ip:15672

自帶的密碼和使用者名稱都是guest,但是隻能本機登入

所以下面我們新增新使用者,和自定義許可權

#新增新使用者
rabbitmqctl add_user 使用者名稱 密碼

#給指定使用者新增管理員許可權
rabbitmqctl set_user_tags 使用者名稱 administrator

給使用者新增許可權
rabbitmqctl set_permissions -p / 使用者名稱 ".*" ".*" ".*"

在web頁面輸入使用者名稱,和密碼

(三)python操作RabbitMQ

python中使用pika操作RabbitMQ

pip install pika
#皮卡皮卡,哈哈

(四)RabbitMQ簡單模式

上程式碼

# coding=utf-8
### 生產者

import pika
import time

user_info = pika.PlainCredentials('root', 'root')#使用者名稱和密碼
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))#連線伺服器上的RabbitMQ服務

# 建立一個channel
channel = connection.channel()

# 如果指定的queue不存在,則會建立一個queue,如果已經存在 則不會做其他動作,官方推薦,每次使用時都可以加上這句
channel.queue_declare(queue='hello')

for i in range(0, 100):
    channel.basic_publish(exchange='',#當前是一個簡單模式,所以這裡設定為空字串就可以了
                          routing_key='hello',# 指定訊息要傳送到哪個queue
                          body='{}'.format(i)# 指定要傳送的訊息
                          )
    time.sleep(1)
    
# 關閉連線
# connection.close()

PS:RabbitMQ中所有的訊息都要先通過交換機,空字串表示使用預設的交換機

# coding=utf-8
### 消費者

import pika

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()

# 如果指定的queue不存在,則會建立一個queue,如果已經存在 則不會做其他動作,生產者和消費者都做這一步的好處是
# 這樣生產者和消費者就沒有必要的先後啟動順序了
channel.queue_declare(queue='hello')


# 回撥函數
def callback(ch, method, properties, body):
    print('消費者收到:{}'.format(body))

# channel: 包含channel的一切屬性和方法
# method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key
# properties: basic_publish 通過 properties 傳入的引數
# body: basic_publish傳送的訊息


channel.basic_consume(queue='hello',  # 接收指定queue的訊息
                      auto_ack=True,  # 指定為True,表示訊息接收到後自動給訊息傳送方回覆確認,已收到訊息
                      on_message_callback=callback  # 設定收到訊息的回撥函數
                      )

print('Waiting for messages. To exit press CTRL+C')

# 一直處於等待接收訊息的狀態,如果沒收到訊息就一直處於阻塞狀態,收到訊息就呼叫上面的回撥函數
channel.start_consuming()

對於上面的這種模式,有一下兩個不好的地方:

一個是在我們的消費者還沒開始消費完佇列裡的訊息,如果這時rabbitmq服務掛了,那麼訊息佇列裡的訊息將會全部丟失,解決方法是在宣告佇列時,宣告佇列為可持久化儲存佇列,並且在生產者在將訊息插入到訊息佇列時,設定訊息持久化儲存,具體如下

# coding=utf-8
### 生產者
import pika
import time

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))

# 建立一個channel
channel = connection.channel()

# 如果指定的queue不存在,則會建立一個queue,如果已經存在 則不會做其他動作,官方推薦,每次使用時都可以加上這句
channel.queue_declare(queue='durable_queue',durable=True)
#PS:這裡不同種佇列不允許名字相同

for i in range(0, 100):
    channel.basic_publish(exchange='',
                          routing_key='durable_queue',
                          body='{}'.format(i),
                          properties=pika.BasicProperties(delivery_mode=2)
                          )
# 關閉連線
# connection.close()

消費者與上面的消費者沒有什麼不同,具體的就是消費宣告的佇列,也要是可持久化的佇列,還有就是,即使在生產者插入訊息時,設定當前訊息持久化儲存(properties=pika.BasicProperties(delivery_mode=2)),並不能百分百保證訊息真的被持久化,因為RabbitMQ掛掉的時候它可能還儲存在快取中,沒來得及同步到磁碟中

在生產者插入訊息後,立刻停止rabbitmq,並重新啟動,其實我們在web管理頁面也可看到未被消費的資訊,當然在啟動消費者後也成功接收到了訊息

上面說的第二點不好就是,如果在消費者獲取到佇列裡的訊息後,在回撥函數的處理過程中,消費者突然出錯或程式崩潰等異常,那麼就會造成這條訊息並未被實際正常的處理掉。為了解決這個問題,我們只需在消費者basic_consume(auto_ack=False),並在回撥函數中設定手動應答即可ch.basic_ack(delivery_tag=method.delivery_tag),具體如下

# coding=utf-8
### 消費者

import pika
import time

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()

# 如果指定的queue不存在,則會建立一個queue,如果已經存在 則不會做其他動作,生產者和消費者都做這一步的好處是
# 這樣生產者和消費者就沒有必要的先後啟動順序了
channel.queue_declare(queue='queue')


# 回撥函數
def callback(ch, method, properties, body):
    time.sleep(5)
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print('消費者收到:{}'.format(body.decode('utf-8')))


# channel: 包含channel的一切屬性和方法
# method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key
# properties: basic_publish 通過 properties 傳入的引數
# body: basic_publish傳送的訊息


channel.basic_consume(queue='queue',  # 接收指定queue的訊息
                      auto_ack=False,  # 指定為False,表示取消自動應答,交由回撥函數手動應答
                      on_message_callback=callback  # 設定收到訊息的回撥函數
                      )

# 應答的本質是告訴訊息佇列可以將這條訊息銷燬了

print('Waiting for messages. To exit press CTRL+C')

# 一直處於等待接收訊息的狀態,如果沒收到訊息就一直處於阻塞狀態,收到訊息就呼叫上面的回撥函數
channel.start_consuming()

這裡只需要設定消費者,生產者並不要修改

還有就是在上的使用方式在,都是一個生產者和一個消費者,還有一種情況就是,一個生產者和多個消費者,即多個消費者同時監聽一個訊息佇列,這時候佇列裡的訊息就是輪詢分發(即如果訊息佇列裡有100條資訊,如果有2個消費者,那麼每個就會收到50條資訊),但是在某些情況下,不同的消費者處理任務的能力是不同的,這時還按照輪詢的方式分發訊息並不是很合理,那麼只需要再配合手動應答的方式,設定消費者接收的訊息沒有處理完,佇列就不要給我放送新的訊息即可,具體設定方式如下:

# coding=utf-8
### 消費者

import pika
import time

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()

# 如果指定的queue不存在,則會建立一個queue,如果已經存在 則不會做其他動作,生產者和消費者都做這一步的好處是
# 這樣生產者和消費者就沒有必要的先後啟動順序了
channel.queue_declare(queue='queue')


# 回撥函數
def callback(ch, method, properties, body):
    time.sleep(0)#通過設定休眠時間來模擬不同消費者的處理時間
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print('消費者收到:{}'.format(body.decode('utf-8')))


# prefetch_count表示接收的訊息數量,當我接收的訊息沒有處理完(用basic_ack標記訊息已處理完畢)之前不會再接收新的訊息了
channel.basic_qos(prefetch_count=1)  # 還有就是這個設定必須在basic_consume之上,否則不生效

channel.basic_consume(queue='queue',  # 接收指定queue的訊息
                      auto_ack=False,  # 指定為False,表示取消自動應答,交由回撥函數手動應答
                      on_message_callback=callback  # 設定收到訊息的回撥函數
                      )

# 應答的本質是告訴訊息佇列可以將這條訊息銷燬了

print('Waiting for messages. To exit press CTRL+C')

# 一直處於等待接收訊息的狀態,如果沒收到訊息就一直處於阻塞狀態,收到訊息就呼叫上面的回撥函數
channel.start_consuming()

PS:這種情況必須關閉自動應答ack,改成手動應答。使用basicQos(perfetch=1)限制每次只傳送不超過1條訊息到同一個消費者,消費者必須手動反饋告知佇列,才會傳送下一個

(五)RabbitMQ釋出訂閱模式

釋出訂閱會將訊息傳送給所有的訂閱者,而訊息佇列中的資料被消費一次便消失。所以,RabbitMQ實現釋出和訂閱時,會為每一個訂閱者建立一個佇列,而釋出者釋出訊息時,會將訊息放置在所有相關佇列中

這個模式中會引入交換機的概念,其實在RabbitMQ中,所有的生產者都不會直接把訊息傳送到佇列中,甚至生產者都不知道訊息在發出後有沒有傳送到queue中,事實上,生產者只能將訊息傳送給交換機,由交換機來決定傳送到哪個佇列中。

交換機的一端用來從生產者中接收訊息,另一端用來傳送訊息到佇列,交換機的型別規定了怎麼處理接收到的訊息,釋出訂閱模式使用到的交換機型別為 fanout ,這種交換機型別非常簡單,就是將接收到的訊息廣播給已知的(即繫結到此交換機的)所有消費者。

當然,如果不想使用特定的交換機,可以使用 exchange=’’ 表示使用預設的交換機,預設的交換機會將訊息傳送到 routing_key 指定的queue,可以參考簡單模式。

上程式碼:

#生產者
import pika

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()

# 建立一個指定名稱的交換機,並指定型別為fanout,用於將接收到的訊息廣播到所有queue中
channel.exchange_declare(exchange='交換機', exchange_type='fanout')

# 將訊息傳送給指定的交換機,在fanout型別中,routing_key=''表示不用傳送到指定queue中,
# 而是將傳送到繫結到此交換機的所有queue
channel.basic_publish(exchange='交換機', routing_key='', body='這是一條測試訊息')
#消費者
import pika

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()

channel.exchange_declare(exchange='交換機', exchange_type='fanout')

# 使用RabbitMQ給自己生成一個專有的queue
result = channel.queue_declare(queue='333')
# result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 這裡如果設定exclusive=True引數,那麼該佇列就是一個只有佇列,在消費者結束後,該專有佇列也會自動清除,如果queue=''沒有設定名字的話,那麼就會自動生成一個
# 不會重複的佇列名

# 將queue繫結到指定交換機
channel.queue_bind(exchange='交換機', queue=queue_name)

print(' [*] Waiting for  message.')


def callback(ch, method, properties, body):
    print("消費者收到:{}".format(body.decode('utf-8')))

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

該模式與簡單模式的還有一個區別就是,這裡的訊息佇列都是由消費者宣告的,所以如果是生產者先啟動,並將訊息發給交換機的畫,這裡的訊息就會丟失,所以我們也可以在消費者端宣告佇列並繫結交換機(不能是專有佇列),所以仔細想想,其實這所謂的釋出訂閱模式並沒有說什麼了不起,它不過是讓交換機同時推播多條訊息給繫結的佇列,我們當然也可以在簡單模式的基礎上多進行幾次basic_publish傳送訊息到指定的佇列。當然我們這樣做的話,可能就沒辦法做到由交換機的同時傳送了,效率可能也沒有一次basic_publish的高

(六)RabbitMQ RPC模式

下面實現由rpc遠端呼叫加減運算

使用者端

import pika
import uuid
import json

class RPC(object):

    def __init__(self):
        self.call_id = None
        self.response = None
        user_info = pika.PlainCredentials('root', 'root')
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
        self.channel = self.connection.channel()

        # 建立一個此使用者端專用的queue,用於接收伺服器端發過來的訊息
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

    def on_response(self, ch, method, props, body):
        # 判斷接收到的response是否屬於對應request
        if self.call_id == props.correlation_id:
            self.response = json.loads(body.decode('utf-8')).get('result')

    def call(self, func, param):
        self.response = None
        self.call_id = str(uuid.uuid4())  # 為該訊息指定uuid,類似於請求id
        self.channel.queue_declare(queue='rpc_queue')
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',  # 將訊息傳送到該queue
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,  # 從該queue中取訊息
                correlation_id=self.call_id,  # 為此次訊息指定uuid
            ),
            body=json.dumps(
                {
                    'func': func,
                    'param': {'a': param[0], 'b': param[1]}
                }
            )
        )
        
        self.connection.process_data_events(time_limit=3)# 與start_consuming()相似,可以設定超時引數
        return self.response

rpc = RPC()

print("傳送訊息到消費者,等待返回結果")

response = rpc.call(func='del', param=(1, 2))

print("收到來自消費者返回的結果:{}".format(response))

伺服器端

import pika
import json

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))

channel = connection.channel()

# 指定接收訊息的queue
channel.queue_declare(queue='rpc_queue')

def add_number(a, b):
    return a + b

def del_num(a, b):
    return a - b

execute_map = {
    'add': add_number,
    'del': del_num
}

def on_request(ch, method, props, body):
    body = json.loads(body.decode('utf-8'))
    func = body.get('func')
    param = body.get('param')
    result = execute_map.get(func)(param.get('a'), param.get('b'))
    print('進行{}運算,並將結果返回個消費者'.format(func))

    ch.basic_publish(exchange='',  # 使用預設交換機
                     routing_key=props.reply_to,  # response傳送到該queue
                     properties=pika.BasicProperties(
                         correlation_id=props.correlation_id),  # 使用correlation_id讓此response與請求訊息對應起來
                     body=json.dumps({'result': result}))

    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
# 從rpc_queue中取訊息,然後使用on_request進行處理
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

(七)說點啥

對於rabbitmq的模式還有Routing模式和Topics模式等,這裡就不復述了,其實pika對於RabbitMQ的使用還有很多細節和引數值得深究。這篇部落格也就是簡單的記錄下我對pika操作raabbitmq過程和簡單的理解

參考連結:

https://www.cnblogs.com/guyuyun/p/14970592.html

https://blog.csdn.net/wohu1104/category_9023593.html

(八)結語

到此這篇關於python對RabbitMQ的簡單入門使用的文章就介紹到這了,更多相關python RabbitMQ使用內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com