<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
RabbitMq 是實現了高階訊息佇列協定(AMQP)的開源訊息代理中介軟體。訊息佇列是一種應用程式對應用程式的通行方式,應用程式通過寫訊息,將訊息傳遞於佇列,由另一應用程式讀取 完成通訊。而作為中介軟體的 RabbitMq 無疑是目前最流行的訊息佇列之一。目前使用較多的訊息佇列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。
RabbitMQ總體架構
PS:生產者和消費者可能在不同的程式或主機中,當然也有可能一個程式有可能既是生產者,也是消費者。
RabbitMq 應用場景廣泛:
1.系統的高可用:日常生活當中各種商城秒殺,高流量,高並行的場景。當伺服器接收到如此大量請求處理業務時,有宕機的風險。某些業務可能極其複雜,但這部分不是高時效性,不需要立即反饋給使用者,我們可以將這部分處理請求拋給佇列,讓程式後置去處理,減輕伺服器在高並行場景下的壓力。
2.分散式系統,整合系統,子系統之間的對接,以及架構設計中常常需要考慮訊息佇列的應用。
apt-get update apt-get install erlang apt-get install rabbitmq-server #啟動rabbitmq: service rabbitmq-server start #停止rabbitmq: service rabbitmq-server stop #重啟rabbitmq: service rabbitmq-server restart #啟動rabbitmq外掛:rabbitmq-plugins enable rabbitmq_management
啟用rabbitmq_management外掛後就可以登入後臺管理頁面了,瀏覽器輸入ip:15672
自帶的密碼和使用者名稱都是guest,但是隻能本機登入
所以下面我們新增新使用者,和自定義許可權
#新增新使用者 rabbitmqctl add_user 使用者名稱 密碼 #給指定使用者新增管理員許可權 rabbitmqctl set_user_tags 使用者名稱 administrator 給使用者新增許可權 rabbitmqctl set_permissions -p / 使用者名稱 ".*" ".*" ".*"
在web頁面輸入使用者名稱,和密碼
python中使用pika操作RabbitMQ
pip install pika #皮卡皮卡,哈哈
上程式碼
# coding=utf-8 ### 生產者 import pika import time user_info = pika.PlainCredentials('root', 'root')#使用者名稱和密碼 connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))#連線伺服器上的RabbitMQ服務 # 建立一個channel channel = connection.channel() # 如果指定的queue不存在,則會建立一個queue,如果已經存在 則不會做其他動作,官方推薦,每次使用時都可以加上這句 channel.queue_declare(queue='hello') for i in range(0, 100): channel.basic_publish(exchange='',#當前是一個簡單模式,所以這裡設定為空字串就可以了 routing_key='hello',# 指定訊息要傳送到哪個queue body='{}'.format(i)# 指定要傳送的訊息 ) time.sleep(1) # 關閉連線 # connection.close()
PS:RabbitMQ中所有的訊息都要先通過交換機,空字串表示使用預設的交換機
# coding=utf-8 ### 消費者 import pika user_info = pika.PlainCredentials('root', 'root') connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info)) channel = connection.channel() # 如果指定的queue不存在,則會建立一個queue,如果已經存在 則不會做其他動作,生產者和消費者都做這一步的好處是 # 這樣生產者和消費者就沒有必要的先後啟動順序了 channel.queue_declare(queue='hello') # 回撥函數 def callback(ch, method, properties, body): print('消費者收到:{}'.format(body)) # channel: 包含channel的一切屬性和方法 # method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key # properties: basic_publish 通過 properties 傳入的引數 # body: basic_publish傳送的訊息 channel.basic_consume(queue='hello', # 接收指定queue的訊息 auto_ack=True, # 指定為True,表示訊息接收到後自動給訊息傳送方回覆確認,已收到訊息 on_message_callback=callback # 設定收到訊息的回撥函數 ) print('Waiting for messages. To exit press CTRL+C') # 一直處於等待接收訊息的狀態,如果沒收到訊息就一直處於阻塞狀態,收到訊息就呼叫上面的回撥函數 channel.start_consuming()
對於上面的這種模式,有一下兩個不好的地方:
一個是在我們的消費者還沒開始消費完佇列裡的訊息,如果這時rabbitmq服務掛了,那麼訊息佇列裡的訊息將會全部丟失,解決方法是在宣告佇列時,宣告佇列為可持久化儲存佇列,並且在生產者在將訊息插入到訊息佇列時,設定訊息持久化儲存,具體如下
# coding=utf-8 ### 生產者 import pika import time user_info = pika.PlainCredentials('root', 'root') connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info)) # 建立一個channel channel = connection.channel() # 如果指定的queue不存在,則會建立一個queue,如果已經存在 則不會做其他動作,官方推薦,每次使用時都可以加上這句 channel.queue_declare(queue='durable_queue',durable=True) #PS:這裡不同種佇列不允許名字相同 for i in range(0, 100): channel.basic_publish(exchange='', routing_key='durable_queue', body='{}'.format(i), properties=pika.BasicProperties(delivery_mode=2) ) # 關閉連線 # connection.close()
消費者與上面的消費者沒有什麼不同,具體的就是消費宣告的佇列,也要是可持久化的佇列,還有就是,即使在生產者插入訊息時,設定當前訊息持久化儲存(properties=pika.BasicProperties(delivery_mode=2)),並不能百分百保證訊息真的被持久化,因為RabbitMQ掛掉的時候它可能還儲存在快取中,沒來得及同步到磁碟中
在生產者插入訊息後,立刻停止rabbitmq,並重新啟動,其實我們在web管理頁面也可看到未被消費的資訊,當然在啟動消費者後也成功接收到了訊息
上面說的第二點不好就是,如果在消費者獲取到佇列裡的訊息後,在回撥函數的處理過程中,消費者突然出錯或程式崩潰等異常,那麼就會造成這條訊息並未被實際正常的處理掉。為了解決這個問題,我們只需在消費者basic_consume(auto_ack=False),並在回撥函數中設定手動應答即可ch.basic_ack(delivery_tag=method.delivery_tag),具體如下
# coding=utf-8 ### 消費者 import pika import time user_info = pika.PlainCredentials('root', 'root') connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info)) channel = connection.channel() # 如果指定的queue不存在,則會建立一個queue,如果已經存在 則不會做其他動作,生產者和消費者都做這一步的好處是 # 這樣生產者和消費者就沒有必要的先後啟動順序了 channel.queue_declare(queue='queue') # 回撥函數 def callback(ch, method, properties, body): time.sleep(5) ch.basic_ack(delivery_tag=method.delivery_tag) print('消費者收到:{}'.format(body.decode('utf-8'))) # channel: 包含channel的一切屬性和方法 # method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key # properties: basic_publish 通過 properties 傳入的引數 # body: basic_publish傳送的訊息 channel.basic_consume(queue='queue', # 接收指定queue的訊息 auto_ack=False, # 指定為False,表示取消自動應答,交由回撥函數手動應答 on_message_callback=callback # 設定收到訊息的回撥函數 ) # 應答的本質是告訴訊息佇列可以將這條訊息銷燬了 print('Waiting for messages. To exit press CTRL+C') # 一直處於等待接收訊息的狀態,如果沒收到訊息就一直處於阻塞狀態,收到訊息就呼叫上面的回撥函數 channel.start_consuming()
這裡只需要設定消費者,生產者並不要修改
還有就是在上的使用方式在,都是一個生產者和一個消費者,還有一種情況就是,一個生產者和多個消費者,即多個消費者同時監聽一個訊息佇列,這時候佇列裡的訊息就是輪詢分發(即如果訊息佇列裡有100條資訊,如果有2個消費者,那麼每個就會收到50條資訊),但是在某些情況下,不同的消費者處理任務的能力是不同的,這時還按照輪詢的方式分發訊息並不是很合理,那麼只需要再配合手動應答的方式,設定消費者接收的訊息沒有處理完,佇列就不要給我放送新的訊息即可,具體設定方式如下:
# coding=utf-8 ### 消費者 import pika import time user_info = pika.PlainCredentials('root', 'root') connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info)) channel = connection.channel() # 如果指定的queue不存在,則會建立一個queue,如果已經存在 則不會做其他動作,生產者和消費者都做這一步的好處是 # 這樣生產者和消費者就沒有必要的先後啟動順序了 channel.queue_declare(queue='queue') # 回撥函數 def callback(ch, method, properties, body): time.sleep(0)#通過設定休眠時間來模擬不同消費者的處理時間 ch.basic_ack(delivery_tag=method.delivery_tag) print('消費者收到:{}'.format(body.decode('utf-8'))) # prefetch_count表示接收的訊息數量,當我接收的訊息沒有處理完(用basic_ack標記訊息已處理完畢)之前不會再接收新的訊息了 channel.basic_qos(prefetch_count=1) # 還有就是這個設定必須在basic_consume之上,否則不生效 channel.basic_consume(queue='queue', # 接收指定queue的訊息 auto_ack=False, # 指定為False,表示取消自動應答,交由回撥函數手動應答 on_message_callback=callback # 設定收到訊息的回撥函數 ) # 應答的本質是告訴訊息佇列可以將這條訊息銷燬了 print('Waiting for messages. To exit press CTRL+C') # 一直處於等待接收訊息的狀態,如果沒收到訊息就一直處於阻塞狀態,收到訊息就呼叫上面的回撥函數 channel.start_consuming()
PS:這種情況必須關閉自動應答ack,改成手動應答。使用basicQos(perfetch=1)限制每次只傳送不超過1條訊息到同一個消費者,消費者必須手動反饋告知佇列,才會傳送下一個
釋出訂閱會將訊息傳送給所有的訂閱者,而訊息佇列中的資料被消費一次便消失。所以,RabbitMQ實現釋出和訂閱時,會為每一個訂閱者建立一個佇列,而釋出者釋出訊息時,會將訊息放置在所有相關佇列中
這個模式中會引入交換機的概念,其實在RabbitMQ中,所有的生產者都不會直接把訊息傳送到佇列中,甚至生產者都不知道訊息在發出後有沒有傳送到queue中,事實上,生產者只能將訊息傳送給交換機,由交換機來決定傳送到哪個佇列中。
交換機的一端用來從生產者中接收訊息,另一端用來傳送訊息到佇列,交換機的型別規定了怎麼處理接收到的訊息,釋出訂閱模式使用到的交換機型別為 fanout ,這種交換機型別非常簡單,就是將接收到的訊息廣播給已知的(即繫結到此交換機的)所有消費者。
當然,如果不想使用特定的交換機,可以使用 exchange=’’ 表示使用預設的交換機,預設的交換機會將訊息傳送到 routing_key 指定的queue,可以參考簡單模式。
上程式碼:
#生產者 import pika user_info = pika.PlainCredentials('root', 'root') connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info)) channel = connection.channel() # 建立一個指定名稱的交換機,並指定型別為fanout,用於將接收到的訊息廣播到所有queue中 channel.exchange_declare(exchange='交換機', exchange_type='fanout') # 將訊息傳送給指定的交換機,在fanout型別中,routing_key=''表示不用傳送到指定queue中, # 而是將傳送到繫結到此交換機的所有queue channel.basic_publish(exchange='交換機', routing_key='', body='這是一條測試訊息')
#消費者 import pika user_info = pika.PlainCredentials('root', 'root') connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info)) channel = connection.channel() channel.exchange_declare(exchange='交換機', exchange_type='fanout') # 使用RabbitMQ給自己生成一個專有的queue result = channel.queue_declare(queue='333') # result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue # 這裡如果設定exclusive=True引數,那麼該佇列就是一個只有佇列,在消費者結束後,該專有佇列也會自動清除,如果queue=''沒有設定名字的話,那麼就會自動生成一個 # 不會重複的佇列名 # 將queue繫結到指定交換機 channel.queue_bind(exchange='交換機', queue=queue_name) print(' [*] Waiting for message.') def callback(ch, method, properties, body): print("消費者收到:{}".format(body.decode('utf-8'))) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
該模式與簡單模式的還有一個區別就是,這裡的訊息佇列都是由消費者宣告的,所以如果是生產者先啟動,並將訊息發給交換機的畫,這裡的訊息就會丟失,所以我們也可以在消費者端宣告佇列並繫結交換機(不能是專有佇列),所以仔細想想,其實這所謂的釋出訂閱模式並沒有說什麼了不起,它不過是讓交換機同時推播多條訊息給繫結的佇列,我們當然也可以在簡單模式的基礎上多進行幾次basic_publish傳送訊息到指定的佇列。當然我們這樣做的話,可能就沒辦法做到由交換機的同時傳送了,效率可能也沒有一次basic_publish的高
下面實現由rpc遠端呼叫加減運算
使用者端
import pika import uuid import json class RPC(object): def __init__(self): self.call_id = None self.response = None user_info = pika.PlainCredentials('root', 'root') self.connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info)) self.channel = self.connection.channel() # 建立一個此使用者端專用的queue,用於接收伺服器端發過來的訊息 result = self.channel.queue_declare(queue='', exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume( queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True) def on_response(self, ch, method, props, body): # 判斷接收到的response是否屬於對應request if self.call_id == props.correlation_id: self.response = json.loads(body.decode('utf-8')).get('result') def call(self, func, param): self.response = None self.call_id = str(uuid.uuid4()) # 為該訊息指定uuid,類似於請求id self.channel.queue_declare(queue='rpc_queue') self.channel.basic_publish( exchange='', routing_key='rpc_queue', # 將訊息傳送到該queue properties=pika.BasicProperties( reply_to=self.callback_queue, # 從該queue中取訊息 correlation_id=self.call_id, # 為此次訊息指定uuid ), body=json.dumps( { 'func': func, 'param': {'a': param[0], 'b': param[1]} } ) ) self.connection.process_data_events(time_limit=3)# 與start_consuming()相似,可以設定超時引數 return self.response rpc = RPC() print("傳送訊息到消費者,等待返回結果") response = rpc.call(func='del', param=(1, 2)) print("收到來自消費者返回的結果:{}".format(response))
伺服器端
import pika import json user_info = pika.PlainCredentials('root', 'root') connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info)) channel = connection.channel() # 指定接收訊息的queue channel.queue_declare(queue='rpc_queue') def add_number(a, b): return a + b def del_num(a, b): return a - b execute_map = { 'add': add_number, 'del': del_num } def on_request(ch, method, props, body): body = json.loads(body.decode('utf-8')) func = body.get('func') param = body.get('param') result = execute_map.get(func)(param.get('a'), param.get('b')) print('進行{}運算,並將結果返回個消費者'.format(func)) ch.basic_publish(exchange='', # 使用預設交換機 routing_key=props.reply_to, # response傳送到該queue properties=pika.BasicProperties( correlation_id=props.correlation_id), # 使用correlation_id讓此response與請求訊息對應起來 body=json.dumps({'result': result})) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) # 從rpc_queue中取訊息,然後使用on_request進行處理 channel.basic_consume(queue='rpc_queue', on_message_callback=on_request) print(" [x] Awaiting RPC requests") channel.start_consuming()
對於rabbitmq的模式還有Routing模式和Topics模式等,這裡就不復述了,其實pika對於RabbitMQ的使用還有很多細節和引數值得深究。這篇部落格也就是簡單的記錄下我對pika操作raabbitmq過程和簡單的理解
參考連結:
https://www.cnblogs.com/guyuyun/p/14970592.html
https://blog.csdn.net/wohu1104/category_9023593.html
到此這篇關於python對RabbitMQ的簡單入門使用的文章就介紹到這了,更多相關python RabbitMQ使用內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45