<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
RabbitMq 是實現了高階訊息佇列協定(AMQP)的開源訊息代理中介軟體。訊息佇列是一種應用程式對應用程式的通行方式,應用程式通過寫訊息,將訊息傳遞於佇列,由另一應用程式讀取 完成通訊。而作為中介軟體的 RabbitMq 無疑是目前最流行的訊息佇列之一。
RabbitMq 應用場景廣泛:
生產者(producter):佇列訊息的產生者,負責生產訊息,並將訊息傳入佇列
import pika import json credentials = pika.PlainCredentials('shampoo', '123456') # mq使用者名稱和密碼 # 虛擬佇列需要指定引數 virtual_host,如果是預設的可以不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel=connection.channel() # 宣告訊息佇列,訊息將在這個佇列傳遞,如不存在,則建立 result = channel.queue_declare(queue = 'python-test') for i in range(10): message=json.dumps({'OrderId':"1000%s"%i}) # 向佇列插入數值 routing_key是佇列名 channel.basic_publish(exchange = '',routing_key = 'python-test',body = message) print(message) connection.close()
消費者(consumer):佇列訊息的接收者,負責 接收並處理 訊息佇列中的訊息
import pika credentials = pika.PlainCredentials('shampoo', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel = connection.channel() # 申明訊息佇列,訊息在這個佇列傳遞,如果不存在,則建立佇列 channel.queue_declare(queue = 'python-test', durable = False) # 定義一個回撥函數來處理訊息佇列中的訊息,這裡是列印出來 def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode()) # 告訴rabbitmq,用callback來接收訊息 channel.basic_consume('python-test',callback) # 開始接收資訊,並進入阻塞狀態,佇列裡有資訊才會呼叫callback進行處理 channel.start_consuming()
MQ預設建立的是臨時 queue 和 exchange,如果不宣告持久化,一旦 rabbitmq 掛掉,queue、exchange 將會全部丟失。所以我們一般在建立 queue 或者 exchange 的時候會宣告 持久化。
1.queue 宣告持久化
# 宣告訊息佇列,訊息將在這個佇列傳遞,如不存在,則建立。durable = True 代表訊息佇列持久化儲存,False 非持久化儲存 result = channel.queue_declare(queue = 'python-test',durable = True)
2.exchange 宣告持久化
# 宣告exchange,由exchange指定訊息在哪個佇列傳遞,如不存在,則建立.durable = True 代表exchange持久化儲存,False 非持久化儲存 channel.exchange_declare(exchange = 'python-test', durable = True)
注意:如果已存在一個非持久化的 queue 或 exchange ,執行上述程式碼會報錯,因為當前狀態不能更改 queue 或 exchange 儲存屬性,需要刪除重建。如果 queue 和 exchange 中一個宣告了持久化,另一個沒有宣告持久化,則不允許繫結。
3.訊息持久化
雖然 exchange 和 queue 都申明瞭持久化,但如果訊息只存在記憶體裡,rabbitmq 重啟後,記憶體裡的東西還是會丟失。所以必須宣告訊息也是持久化,從記憶體轉存到硬碟。
# 向佇列插入數值 routing_key是佇列名。delivery_mode = 2 宣告訊息在佇列中持久化,delivery_mod = 1 訊息非持久化 channel.basic_publish(exchange = '',routing_key = 'python-test',body = message, properties=pika.BasicProperties(delivery_mode = 2))
4.acknowledgement 訊息不丟失
消費者(consumer)呼叫callback函數時,會存在處理訊息失敗的風險,如果處理失敗,則訊息丟失。但是也可以選擇消費者處理失敗時,將訊息回退給 rabbitmq ,重新再被消費者消費,這個時候需要設定確認標識。
channel.basic_consume(callback,queue = 'python-test', # no_ack 設定成 False,在呼叫callback函數時,未收到確認標識,訊息會重回佇列。True,無論呼叫callback成功與否,訊息都被消費掉 no_ack = False)
rabbitmq 的釋出與訂閱要藉助交換機(Exchange)的原理實現:
Exchange 一共有三種工作模式:fanout, direct, topicd
這種模式下,傳遞到 exchange 的訊息將會轉發到所有與其繫結的 queue 上。
釋出者:
import pika import json credentials = pika.PlainCredentials('shampoo', '123456') # mq使用者名稱和密碼 # 虛擬佇列需要指定引數 virtual_host,如果是預設的可以不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel=connection.channel() # 宣告exchange,由exchange指定訊息在哪個佇列傳遞,如不存在,則建立。durable = True 代表exchange持久化儲存,False 非持久化儲存 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout') for i in range(10): message=json.dumps({'OrderId':"1000%s"%i}) # 向佇列插入數值 routing_key是佇列名。delivery_mode = 2 宣告訊息在佇列中持久化,delivery_mod = 1 訊息非持久化。routing_key 不需要設定 channel.basic_publish(exchange = 'python-test',routing_key = '',body = message, properties=pika.BasicProperties(delivery_mode = 2)) print(message) connection.close()
訂閱者:
import pika credentials = pika.PlainCredentials('shampoo', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel = connection.channel() # 建立臨時佇列,佇列名傳空字元,consumer關閉後,佇列自動刪除 result = channel.queue_declare('',exclusive=True) # 宣告exchange,由exchange指定訊息在哪個佇列傳遞,如不存在,則建立。durable = True 代表exchange持久化儲存,False 非持久化儲存 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout') # 繫結exchange和佇列 exchange 使我們能夠確切地指定訊息應該到哪個佇列去 channel.queue_bind(exchange = 'python-test',queue = result.method.queue) # 定義一個回撥函數來處理訊息佇列中的訊息,這裡是列印出來 def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode()) channel.basic_consume(result.method.queue,callback,# 設定成 False,在呼叫callback函數時,未收到確認標識,訊息會重回佇列。True,無論呼叫callback成功與否,訊息都被消費掉 auto_ack = False) channel.start_consuming()
這種工作模式的原理是 訊息傳送至 exchange,exchange 根據 路由鍵(routing_key)轉發到相對應的 queue 上。
釋出者:
import pika import json credentials = pika.PlainCredentials('shampoo', '123456') # mq使用者名稱和密碼 # 虛擬佇列需要指定引數 virtual_host,如果是預設的可以不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel=connection.channel() # 宣告exchange,由exchange指定訊息在哪個佇列傳遞,如不存在,則建立。durable = True 代表exchange持久化儲存,False 非持久化儲存 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct') for i in range(10): message=json.dumps({'OrderId':"1000%s"%i}) # 指定 routing_key。delivery_mode = 2 宣告訊息在佇列中持久化,delivery_mod = 1 訊息非持久化 channel.basic_publish(exchange = 'python-test',routing_key = 'OrderId',body = message, properties=pika.BasicProperties(delivery_mode = 2)) print(message) connection.close()
消費者:
import pika credentials = pika.PlainCredentials('shampoo', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel = connection.channel() # 建立臨時佇列,佇列名傳空字元,consumer關閉後,佇列自動刪除 result = channel.queue_declare('',exclusive=True) # 宣告exchange,由exchange指定訊息在哪個佇列傳遞,如不存在,則建立。durable = True 代表exchange持久化儲存,False 非持久化儲存 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct') # 繫結exchange和佇列 exchange 使我們能夠確切地指定訊息應該到哪個佇列去 channel.queue_bind(exchange = 'python-test',queue = result.method.queue,routing_key='OrderId') # 定義一個回撥函數來處理訊息佇列中的訊息,這裡是列印出來 def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode()) #channel.basic_qos(prefetch_count=1) # 告訴rabbitmq,用callback來接受訊息 channel.basic_consume(result.method.queue,callback, # 設定成 False,在呼叫callback函數時,未收到確認標識,訊息會重回佇列。True,無論呼叫callback成功與否,訊息都被消費掉 auto_ack = False) channel.start_consuming()
這種模式和第二種模式差不多,exchange 也是通過 路由鍵 routing_key 來轉發訊息到指定的 queue 。 不同點是 routing_key 使用正規表示式支援模糊匹配,但匹配規則又與常規的正規表示式不同,比如“#”是匹配全部,“*”是匹配一個詞。
舉例:routing_key =“#orderid#”,意思是將訊息轉發至所有 routing_key 包含 “orderid” 字元的佇列中。程式碼和模式二 類似,就不貼出來了。
以上就是python操作RabbitMq的三種工作模式的詳細內容,更多關於python操作RabbitMq工作模式的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45