2021-05-12 14:32:11
RabbitMQ 學習初入門
RabbitMQ是一個訊息佇列,我們可以使用RabbitMQ 做訊息佇列,訊息通知的業務功能,而且根據網上的不可靠訊息得出,RabbitMQ 的效能水平甚至比 activeMQ 還要好,所以也是我選擇認真去學習RabbitMQ的原因,當然我也有做個關於 activeMQ 的一些簡單的 Demo 有機會的話可以分享出來~
Rabbit 使用 Erlang 來編寫的AMQP 伺服器。 什麼是 AMQP 本人已經百度給大家了:
AMQP,即Advanced Message Queuing Protocol,一個提供統一訊息服務的應用層標準高階訊息佇列協定,是應用層協定的一個開放標準,為訊息導向中介層設計。基於此協定的用戶端與訊息中介軟體可傳遞訊息,並不受用戶端/中介軟體不同產品,不同的開發語言等條件的限制。Erlang中的實現有 RabbitMQ等。
哎喲,先從按照說起吧~ 我使用macbook 來進行學習和開發的 順便提一下 本人是位果粉,別噴~ 使用 MAC OS 安非常簡單,還我是建議使用 MAC OS 的同學使用 brew 進行開發,沒有別的 就是因為方便。
使用這個命令之前 如果沒有安裝過 Homebrew 就安裝一下吧 有機會我也分享一下安裝方式 順便介紹一下這個讓你懶癌晚期的Homebrew。
開啟終端輸入:
brew install rabbitmq
然後會告訴你 無法獲得 /usr/local/sbin 的寫入許可權。 因為是英文的錯誤提示,本人會看不會寫 所以直接寫中文給各位看了~ 還是那句 別噴~ 謝謝
好,接下來讓他有許可權,其實這個目錄是沒有的sbin。
mkdir /usr/local/sbin 建立目錄
chmod 777 /usr/local/sbin 開許可權,全開別煩
brew link rabbitmq link 一下
然後就沒有然後了
啟動 rabbitMQ
MacBook-Pro:~ Tony$ /usr/local/sbin/rabbitmq-server
RabbitMQ 3.5.6. Copyright (C) 2007-2015 Pivotal Software, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
###### ## /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
##########
Starting broker... completed with 10 plugins.
目前為止已經安裝完 rabbitMQ 並且啟動起來 了 簡單到飛
開始廢話 說說概念,別方 我也是抄書的~
消費者和生產者,由於書本的篇幅太長,就抄了!簡單明瞭告訴你們,訊息的產生者就是生產者,獲得訊息的稱為訊息消費者。我QQ M了你一下 我就是訊息的生產者了,你在QQ看到我的訊息了,你就是訊息的消費者了,這才是正常寫書方式嘛··· 舉個開發的業務背景的例子,我支付了一張訂單,這個業務產生了幾個次要的業務,例如積分業務,我購買了訂單,訂單核心功能處理訂單的業務,然後生產了一個訊息並通過rabbitMQ 傳送了一個訊息,內容為產生了一個訂單,然後訂閱了訊息的訊息消費者就會消費訊息,然後根據訊息處理自己的業務,該幹嘛幹嘛去~ 積分業務就去加你的積分等等。當然這個只是舉例,別吐槽 可以在台伺服器或者一個業務方法上實現。 上面的舉例對於一些分散式的微服務上是非常常見的方式了~
幾個重要的概念:
通道:在連線到rabbitMQ 之後,你將可以建立並獲得一個AMQP通道,通道是建立在“真實的”TCP連線內的虛擬連結。所有的AMQP命令都是通過 AMQP 通道發出去的,每條通道會被指派一個唯一的ID。所有發布訊息還是訂閱都是通過AMQP 通道去處理的。這個概念非常像埠號【但是不是埠啊 ,別說我混淆你】,而且他的英文名字可以理解到他的意思channel 有使用NIO 的開發者已經比較了解~ 其實使用AMQP通道的原因非常簡單,建立TCP連線的效能代價是非常高的,當你在不同的執行緒當中去建立TCP連線的方式去和RabbitMQ進行通訊的代價是十分高的,但是在多個不同的執行緒當中通過channel 我們就可以通過同一個TCP連線進行通訊了。好像寫得好亂不知道他們理不理解我寫什麼,唉 算了 本人的水平有限,況且大多數情況下都是我自己看而已,放棄~ 抽象圖如下 醜,不過先將就~
交換器 與 路由鍵: 這個好難解釋啊~ 直接說工作流程吧~ 哎喲其實應該把 佇列寫到這裡來,算了吧 不改了。 先記住,交換器當中裡面有N個佇列,而路由鍵 是讓交換器知道訊息應該放到哪個佇列當中去。別廢話舉例: 產生訊息 要告訴 RabbitMQ 這個訊息是放到那個交換器上,【注意:交換器是由開發者去建立的你可以搞N個交換器出來,沒有管你】
上程式碼:
this.channel.basicPublish(“hello-exchange”,”hola”,properties, SerializationUtils.serialize(msg));
首先 hello-exchange 是交換器的名字, hola 是路由鍵 這個讓hello-exchange 去判斷應該分發到那個佇列當中,SerializationUtils.serialize(msg) 將一個訊息序列化。 channel 就不用說啦上面說了,是一個AMQP通道。
交換器有很多不同的型別一會會一一介紹。交換器的型別不同也註定了訊息會分發到哪個佇列當中,所有尤其重要。
工作流程:
1、消費者訂閱訊息,先建立一個交換器,如果交換器已經存在即返回一個交換器。當然你可以passive 為 true 如果 passive 為 true , 交換器已經存在返回一個已經存在的交換器,否則報錯。我暫時不知道用在哪裡,所以 先記住一般情況下 建立一個交換器,如果存在就返回一個已經存在的交換器,如果不存在則建立並返回。
JAVA 程式碼如下【直接給原始碼的方法定義,是不是很貼心】:
備註是自己翻譯的別噴
/**
* Declare an exchange, via an interface that allows the complete set of
* arguments.
* @see com.rabbitmq.client.AMQP.Exchange.Declare
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
* @param exchange the name of the exchange 【交換器名稱】
* @param type the exchange type 【交換器型別 一會說 別著急】
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) 【是否持久化交換器,這個後面說,還是簡單說一下,糾結帝~ 就是交換器中的訊息會持久化,就是會存在硬碟當中,宕機或重新啟動服務時會自動恢復當時會犧牲效能,除非你的SSD硬碟好快,當我沒有說過~】
* @param autoDelete true if the server should delete the exchange when it is no longer in use 【當這個交換器 已經沒有人使用的時候 會自動刪除,longer 長時期 當時我不知道長時期是什麼時候,所以我會記住是 沒人用就自己刪掉 自動消失】
* @param internal true if the exchange is internal, i.e. can't be directly
* published to by a client. 【不知道什麼鬼意思,如果是內部 不會直接傳送到用戶端,我一般寫false ,真心不知道他什麼意思,後面學下去應該明白】
* @param arguments other properties (construction arguments) for the exchange 【其他引數 暫時沒有用到】
* @return a declaration-confirm method to indicate the exchange was successfully declared
* @throws java.io.IOException if an error is encountered 【會丟擲IO異常】
*/
Exchange.DeclareOk exchangeDeclare(String exchange,String type, boolean durable, boolean autoDelete, boolean internal,Map<String, Object> arguments) throws IOException;
2、消費者建立佇列,並繫結佇列到交換器當中,上寫路由鍵,讓交換器知道這個路由鍵的訊息是跑到這個佇列當中的。
JAVA 程式碼如下【直接給原始碼的方法定義】:
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue 【佇列名稱】
* @param durable true if we are declaring a durable queue (the queue will survive a server restart) 【持久化,這裡特別說一下,如果你想訊息是持久化的,必須訊息是持久化的,交換器也是持久化的,佇列更是持久化的,其中一個不是也無法恢復訊息】
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection) 【私有的,獨有的。 這個佇列之後這個應用可以消費,上面的英文注釋是 說restricted to this connection 就是限制在這個連線可以消費,就是說不限制channel通道咯,具體沒有試過,但是應該是這樣,除非備註騙我,我讀得書少,你唔好呃我!!!】
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) 【沒有人使用自動刪除】 注意:如果exclusive為true 最好 autodelete都為true 至於為什麼 這麼簡單自己想~
* @param arguments other properties (construction arguments) for the queue 【其他引數沒有玩過】
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
係結佇列和交換器 並指定路由鍵
/**
* Bind a queue to an exchange.
* @see com.rabbitmq.client.AMQP.Queue.Bind
* @see com.rabbitmq.client.AMQP.Queue.BindOk
* @param queue the name of the queue 【佇列名稱】
* @param exchange the name of the exchange 【交換器名稱】
* @param routingKey the routine key to use for the binding 【路由鍵】
* @param arguments other properties (binding parameters) 【其他引數 還是那句沒有玩過】
* @return a binding-confirm method if the binding was successfully created
* @throws java.io.IOException if an error is encountered
*/
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
3、然後消費者就去訂閱訊息咯,注意在JAVA裡面的訂閱方法會產生執行緒阻塞的。
JAVA 程式碼:
訂閱訊息並消費
/**
* Start a non-nolocal, non-exclusive consumer, with
* a server-generated consumerTag.
* @param queue the name of the queue 【所訂閱消費的佇列】
* @param autoAck true if the server should consider messages 【是否為自動確定訊息,好像TCP的ack syn 啊,可怕的7層模型!!!一般寫true就可以】
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param callback an interface to the consumer object 【回撥callback 這個馬上上程式碼看看】
* @return the consumerTag generated by the server
* @throws java.io.IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Consume
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
接收到訊息之後,馬上有回撥,來看看回撥介面:
/**
* Interface for application callback objects to receive notifications and messages from
* a queue by subscription.
* Most implementations will subclass {@link DefaultConsumer}.
* <p/>
* The methods of this interface are invoked in a dispatch
* thread which is separate from the {@link Connection}'s thread. This
* allows {@link Consumer}s to call {@link Channel} or {@link
* Connection} methods without causing a deadlock.
* <p/>
* The {@link Consumer}s on a particular {@link Channel} are invoked serially on one or more
* dispatch threads. {@link Consumer}s should avoid executing long-running code
* because this will delay dispatch of messages to other {@link Consumer}s on the same
* {@link Channel}.
*
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, java.util.Map, Consumer)
* @see Channel#basicCancel
*/
public interface Consumer {
/**
* Called when the consumer is registered by a call to any of the
* {@link Channel#basicConsume} methods.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
*/
void handleConsumeOk(String consumerTag);
/**
* Called when the consumer is cancelled by a call to {@link Channel#basicCancel}.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
*/
void handleCancelOk(String consumerTag);
/**
* Called when the consumer is cancelled for reasons <i>other than</i> by a call to
* {@link Channel#basicCancel}. For example, the queue has been deleted.
* See {@link #handleCancelOk} for notification of consumer
* cancellation due to {@link Channel#basicCancel}.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
* @throws IOException
*/
void handleCancel(String consumerTag) throws IOException;
/**
* Called when either the channel or the underlying connection has been shut down.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
* @param sig a {@link ShutdownSignalException} indicating the reason for the shut down
*/
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
/**
* Called when a <code><b>basic.recover-ok</b></code> is received
* in reply to a <code><b>basic.recover</b></code>. All messages
* received before this is invoked that haven't been <i>ack</i>'ed will be
* re-delivered. All messages received afterwards won't be.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
*/
void handleRecoverOk(String consumerTag);
/**
* Called when a <code><b>basic.deliver</b></code> is received for this consumer.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
* @param envelope packaging data for the message
* @param properties content header data for the message
* @param body the message body (opaque, client-specific byte array)
* @throws IOException if the consumer encounters an I/O error while processing the message
* @see Envelope 寶寶累了 看重點,這個方法就是訊息到達的時候回撥的方法其他你們喜歡研究 可以認真看看英文備註。
*/
void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException;
}
注意一下:其實也可以使用其他訂閱方式去消費訊息的,可以使用get的方式去獲得一條訊息,然後會在佇列當中給你一條訊息,但是你會想就不需要使用上面的訂閱方法啦,直接跑個迴圈就可以啦。其實get方法會開啟訂閱獲得一條訊息後關閉訂閱,這樣會產生不必要的效能開銷的,除非老闆讓你搞卡一點,讓客戶掏點優化費,不然就別這樣幹了。get的方法這裡就先不介紹啦~
4、上面所說的都是訊息消費者的工作,這個step開始說訊息生產者要做的。開啟一個交換器,當然這個交換器應該是存在不用建立的因為 上面所說消費者已經建立過了,但是其實誰去先建立都是可以的,消費者也可以建立,生產者也可以,這個倒是沒有什麼關係,主要是看你的業務需求【這句話甩鍋用】。由於已經貼出過建立交換器的程式碼所以就不貼了,看上面。
與上面的交換器建立一樣。不貼程式碼了自己拖上去看吧~
5、好像沒有什麼別的事情了,就是產生一個訊息然後發過去。上程式碼
JAVA程式碼:
/**
* Publish a message
* @see com.rabbitmq.client.AMQP.Basic.Publish
* @param exchange the exchange to publish the message to 【傳送到那個交換器上】
* @param routingKey the routing key 【路由鍵 決定訊息去哪個佇列裡面混】
* @param props other properties for the message - routing headers etc 【其他訊息引數, 哎~ 這個我玩過,一會DEMO時間,表演一下】
* @param body the message body 【訊息體,一般把物件序列化一下發過去】
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
然後就沒有然後了 上面 第三步的 訂閱方法中的回撥 就會有這個訊息了。 好又到說概念的時候了。 困的同學先休息。
訊息的投遞方式,我也睏了其實:
【1】 訊息MESSAGE_A 到達 queue_A 佇列
【2】 RabbitMQ 把 MESSAGE_A 傳送到 APPLICATION A(消費者)
【3】 APPLICATION A 確定收到了訊息 發個ack 高興一下
【4】 RabbitMQ 把 訊息從 queue_A 佇列刪除。上面其實也有說過,autoack引數 有印象吧~ 所以autoack 設定成true 就沒錯了。這裡就可以開展其他話題了,如果多個訂閱者怎麼辦呢,是不是多個訂閱者都會收到訊息呢,答案是否定的。RabbitMQ的佇列會對訂閱者做一個迴圈,例如目前有兩個訂閱者訂閱了同一個佇列,serverA 和 serverB 他會迴圈去傳送訊息,佇列有 ID 1-10 的訊息,ID 1訊息會由 serverA 處理 ID 2 訊息會由 serverB 處理 ID 3 訊息會由 serverA 去處理 如此類推。
如果serverA 在傳送ACK應答給 RabbitMQ之前 斷開連線【就是服務掛了,宕機了】RabbitMQ 會認為這個訊息沒有處理,即沒有消費到這個訊息,會把這個訊息傳送到 serverB 去處理。
而且在訊息沒有ACK之前,RabbitMQ不會發你第二條訊息的,所以如果你想等待訊息的任務處理完之後再給第二個訊息的話,可以將autoDelete設定成false,這樣你就可以在訊息處理完之後再去ack。???樣也是一個非常通用的場景,以防扎堆處理訊息。
其實這樣的設計也是非常好的,如果你出現了業務錯誤,執行訊息處理的時候,剛好出現問題了,你可以通過斷開連線的方式【這裡所指的斷開是斷開RabbitMQ的connection】讓這個訊息交給下個訂閱者,【這裡說一下,如果佇列沒有消費者去訂閱訊息的話,訊息會存在RabbitMQ當中等待有消費者去訂閱再去傳送】,當然這也是在你沒有ack的情況下。在高階版本的RabbitMQ中可以使用reject命令,讓這個訊息直接傳遞到下個訂閱者。【在RabbitMQ2.0可以使用】
AMQP通道channel與訂閱
如果 一個channel 訂閱了一個佇列就不能訂閱別的佇列了,也就是說一個channel只能訂閱一個佇列。所以你需要取消原來的訂閱,並將通道設定為“傳輸”模式。後面有時間寫後面的文章可能會說到。
交換器的型別:
direct :這個簡單,只要路由鍵一模一樣就投遞到相應的佇列當中。
fanout :這個也簡單,訊息會投遞到所有系結到這個交換器的佇列當中。
topic :這個就複雜一點了,我很客觀的簡單就簡單 複雜就複雜 從不忽悠。但是這個也好常用不能不學,好累。在路由鍵當中可以使用萬用字元。怎麼說呢,好糾結啊 不知道怎麼解釋啊 怎麼辦 線上等。
舉個例子吧,係結佇列:
//係結佇列 路由鍵是 tony.teamA 有路由鍵為tony.teamA的訊息就投遞到這裡
this.channel.queueBind("queueA","topic_exchangeA","tony.teamA",null);
//係結佇列 路由鍵是 yan.teamA 有路由鍵為tony.teamB的訊息就投遞到這裡
this.channel.queueBind("queueB","topic_exchangeA","yan.teamA",null);
//係結佇列 路由鍵是 *.teamA 有路由鍵為.teamA結尾的訊息就投遞到這裡
this.channel.queueBind("queueC","topic_exchangeA","*.teamA",null);
//係結佇列 路由鍵是 chao.teamB 有路由鍵為chao.teamB的訊息就投遞到這裡
this.channel.queueBind("queueD","topic_exchangeA","chao.teamB",null);
//係結佇列 路由鍵是 *.teamB 有路由鍵為.teamB結尾的訊息就投遞到這裡
this.channel.queueBind("queueE","topic_exchangeA","*.teamB",null);
//係結佇列 路由鍵是 # 所有在這個交換器的訊息都投遞到這裡
this.channel.queueBind("queueF","topic_exchangeA","#",null);
// queueA[路由鍵:tony.teamA]、queueC[路由鍵:*.teamA]、queueF[路由鍵:#]會收到訊息
this.channel.basicPublish("topic_exchangeA","tony.teamA",properties,SerializationUtils.serialize(msg));
// queueD[路由鍵:chao.teamB]、queueE[路由鍵:*.teamB]、queueF[路由鍵:#]會收到訊息
this.channel.basicPublish("topic_exchangeA","chao.teamB",properties,SerializationUtils.serialize(msg));
然後基本的rabbitMQ 的東西就搞好了。不過這只是開始~
簡單說一下虛擬主機和隔離吧,簡單來說想MySQL 你可以建立很多個庫,裡面有很多個表。然後呢 rabbitMQ可以建立很多個虛擬主機,虛擬主機裡面有很多個交換器,交換器裡面有很多個佇列,解釋得完美。預設會提供一個 預設虛擬主機 vhost : “/”。後面找時間再說這個 vhost 吧。
重頭大戲上DEMO,由於方便我閱讀回憶,所以我忽略的封裝性,一切以容易快速看懂和回憶為目標。別噴~
生產者:
package com.maxfunner;
import com.maxfunner.mq.EndPoint;
import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Producer
*/
public class Producer {
private Connection connection;
private Channel channel;
private Map<Long, String> messageMap = new HashMap<Long, String>();
private int maxID = 0;
private static final String EXCHANGE_NAME = "MY_EXCHANGE";
public void createConnectionAndChannel() throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1"); //伺服器地址
factory.setUsername("guest"); //預設使用者名稱
factory.setPassword("guest"); //預設密碼
factory.setPort(5672); //預設埠,對就是這麼屌全部預設的
this.connection = factory.newConnection(); //建立連結
this.channel = this.connection.createChannel();
}
public void initChannelAndCreateExchange() throws IOException {
this.channel.confirmSelect(); //啟用訊息確認已經投遞成功的回撥
/**
* 建立了一個交換器,型別為 direct 非持久化 自動刪除 沒有額外引數
*/
this.channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, true, null);
this.channel.addConfirmListener(new ConfirmListener() {
/**
* 成功的時候回撥【這個是當訊息到達交換器的時候回撥】
* @param deliveryTag 每一條訊息都有一個唯一ID【只是同一個channel唯一】,每次發出訊息遞增1 因為同一個channel所有也保證了訊息的流水性。
* @param multiple
* @throws IOException
*/
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
String message = messageMap.get(deliveryTag);
System.out.println("message : " + message + " ! 傳送成功");
messageMap.remove(message);
//最後一個訊息都搞掂之後 關閉所有東西
if (deliveryTag >= maxID) {
closeAnything();
}
}
/**
* 失敗的時候回撥
* @param deliveryTag 每一條訊息都有一個唯一ID【只是同一個channel唯一】,每次發出訊息遞增1 因為同一個channel所有也保證了訊息的流水性。
* @param multiple
* @throws IOException
*/
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
String message = messageMap.get(deliveryTag);
System.out.println("message : " + message + " ! 傳送失敗");
messageMap.remove(message); //傳送失敗就不重發了,發脾氣
//最後一個訊息都搞掂之後 關閉所有東西
if (deliveryTag >= maxID) {
closeAnything();
}
}
});
}
public void sendMessage(String message) throws IOException {
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentType("text/plain") //指定是一個文字
.build();
// 傳送一個訊息 到 EXCHANGE_NAME 的交換器中 路由鍵為 KEY_A 傳送 message 之前序列化一下 具體用什麼包上面import自己看
this.channel.basicPublish(EXCHANGE_NAME, "KEY_A", properties, SerializationUtils.serialize(message));
}
public void closeAnything() throws IOException {
this.channel.close(); //跪安吧 小channel
this.connection.close(); //你也滾吧 connection
}
public static void main(String[] args) throws IOException {
Producer producer = new Producer();
producer.createConnectionAndChannel();
producer.initChannelAndCreateExchange();
List<String> messageList = new ArrayList<String>();
messageList.add("message_A");
messageList.add("message_B");
messageList.add("message_C");
messageList.add("message_D");
messageList.add("message_E");
messageList.add("message_F");
producer.maxID = messageList.size(); //記錄最後一個ID 當最後一個訊息傳送成功後關閉連線
//注意:因為channel產生的ID 是從1開始的
for (int i = 1; i <= messageList.size(); i++) {
producer.messageMap.put(new Long(i), messageList.get(i - 1)); //這裡看懂了嗎?沒看懂也沒有辦法了,這裡我真不知道怎麼解釋
producer.sendMessage(messageList.get(i - 1));
}
}
}
消費者:
package com.maxfunner;
import com.maxfunner.mq.QueueConsumer;
import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* Consumer
*/
public class Consumer {
private Connection connection;
private Channel channel;
private Map<Integer,String> messageMap = new HashMap<Integer, String>();
private static final String EXCHANGE_NAME = "MY_EXCHANGE";
/**
* 對,你猜得一點都沒有錯,我是複製的
* @throws IOException
*/
public void createConnectionAndChannel() throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1"); //伺服器地址
factory.setUsername("guest"); //預設使用者名稱
factory.setPassword("guest"); //預設密碼
factory.setPort(5672); //預設埠,對就是這麼屌全部預設的
this.connection = factory.newConnection(); //建立連結
this.channel = this.connection.createChannel();
}
public void createAndBindQueue() throws IOException {
/**
* 建立了一個交換器,型別為 direct 非持久化 自動刪除 沒有額外引數
*/
this.channel.exchangeDeclare(EXCHANGE_NAME,"direct",false,true,null); //最好也建立一下交換器,反正已經建立也沒有關係
/**
* 建立了一個佇列, 名稱為 QUEUE_A 非持久化 非獨有的 自動刪除的 沒有額外刪除的
*/
this.channel.queueDeclare("QUEUE_A",false,false,true,null);
this.channel.queueBind("QUEUE_A",EXCHANGE_NAME,"KEY_A");
}
public static void main(String args[]) throws IOException {
final Consumer consumer = new Consumer();
consumer.createConnectionAndChannel();
consumer.createAndBindQueue();
System.out.println("等待訊息中。。。。");
new Thread(new Runnable() {
public void run() {
try {
/**
* 訂閱訊息,訂閱佇列QUEUE_A 獲得訊息後自動確認
*/
consumer.channel.basicConsume("QUEUE_A", true, new com.rabbitmq.client.Consumer() {
public void handleConsumeOk(String consumerTag) {
}
public void handleCancelOk(String consumerTag) {
}
public void handleCancel(String consumerTag) throws IOException {
}
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
}
public void handleRecoverOk(String consumerTag) {
}
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("message : " + SerializationUtils.deserialize(body));
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
生產者執行結果:
message : message_A ! 傳送成功
message : message_B ! 傳送成功
message : message_C ! 傳送成功
message : message_D ! 傳送成功
message : message_E ! 傳送成功
message : message_F ! 傳送成功
消費者執行結果:
等待訊息中。。。。
message : message_A
message : message_B
message : message_C
message : message_D
message : message_E
message : message_F
專案我是用maven建立的貼一下maven 的pom.xml檔案
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.maxfunner</groupId>
<artifactId>rabbitlearning</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>rabbitlearning</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.0.4</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>
</dependencies>
</project>
最後一個重點問題
訊息“黑洞”,如果在沒有系結佇列和交換器之前,所有發出的訊息都無法匹配到相應的佇列當中,那些訊息將永遠不會被消費。而且confirm的callback也是會返回成功的即使訊息進入了訊息“黑洞”。所以在傳送訊息之前 必須確定佇列已經系結,確保訊息能分配到相應的佇列當中。 測試很簡單,上面的DEMO 先執行 Producer 顯示所有訊息傳送成功,然後再執行 Consumer 發現沒有訊息可以接收。 再嘗試先執行Consumer 再執行 Producer 就發現一切都正常了,這也是為什麼我把autoDelete設定為true的原因,有了autoDelete當佇列沒有人用的時候就會自動刪除。所以每次執行都可以測試出問題。要保證訊息能夠到達指定的佇列最好也在Producer中建立佇列 而且進行相關的系結 然後再傳送訊息 修改一下 Producer 的其中一方法即可。 不寫了 累了~
public void initChannelAndCreateExchange() throws IOException {
this.channel.confirmSelect(); //啟用訊息確認已經投遞成功的回撥
/**
* 建立了一個交換器,型別為 direct 非持久化 自動刪除 沒有額外引數
*/
this.channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, true, null);
this.channel.addConfirmListener(new ConfirmListener() {
/**
* 成功的時候回撥【這個是當訊息到達交換器的時候回撥】
* @param deliveryTag 每一條訊息都有一個唯一ID【只是同一個channel唯一】,每次發出訊息遞增1 因為同一個channel所有也保證了訊息的流水性。
* @param multiple
* @throws IOException
*/
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
String message = messageMap.get(deliveryTag);
System.out.println("message : " + message + " ! 傳送成功");
messageMap.remove(message);
//最後一個訊息都搞掂之後 關閉所有東西
if (deliveryTag >= maxID) {
closeAnything();
}
}
/**
* 失敗的時候回撥
* @param deliveryTag 每一條訊息都有一個唯一ID【只是同一個channel唯一】,每次發出訊息遞增1 因為同一個channel所有也保證了訊息的流水性。
* @param multiple
* @throws IOException
*/
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
String message = messageMap.get(deliveryTag);
System.out.println("message : " + message + " ! 傳送失敗");
messageMap.remove(message); //傳送失敗就不重發了,發脾氣
//最後一個訊息都搞掂之後 關閉所有東西
if (deliveryTag >= maxID) {
closeAnything();
}
}
});
/**
* 建立了一個佇列, 名稱為 QUEUE_A 非持久化 非獨有的 自動刪除的 沒有額外刪除的
*/
this.channel.queueDeclare("QUEUE_A",false,false,true,null);
this.channel.queueBind("QUEUE_A",EXCHANGE_NAME,"KEY_A");
CentOS7下安裝RabbitMQ http://www.linuxidc.com/Linux/2016-11/136812.htm
CentOS 7.2 下 RabbitMQ 叢集搭建 http://www.linuxidc.com/Linux/2016-12/137812.htm
CentOS7環境安裝使用專業的訊息佇列產品RabbitMQ http://www.linuxidc.com/Linux/2016-11/13673.htm
設定與管理RabbitMQ http://www.linuxidc.com/Linux/2016-11/136815.htm
RabbitMQ概念及環境搭建 http://www.linuxidc.com/Linux/2014-12/110449.htm
RabbitMQ入門教學 http://www.linuxidc.com/Linux/2015-02/113983.htm
相關文章