首頁 > 軟體

RabbitMQ 學習初入門

2020-06-16 17:29:24

RabbitMQ是一個訊息佇列,我們可以使用RabbitMQ 做訊息佇列,訊息通知的業務功能,而且根據網上的不可靠訊息得出,RabbitMQ 的效能水平甚至比 activeMQ 還要好,所以也是我選擇認真去學習RabbitMQ的原因,當然我也有做個關於 activeMQ 的一些簡單的 Demo 有機會的話可以分享出來~

Rabbit 使用 Erlang 來編寫的AMQP 伺服器。 什麼是 AMQP 本人已經百度給大家了:

AMQP,即Advanced Message Queuing Protocol,一個提供統一訊息服務的應用層標準高階訊息佇列協定,是應用層協定的一個開放標準,為訊息導向中介層設計。基於此協定的用戶端與訊息中介軟體可傳遞訊息,並不受用戶端/中介軟體不同產品,不同的開發語言等條件的限制。Erlang中的實現有 RabbitMQ等。

哎喲,先從按照說起吧~ 我使用macbook 來進行學習和開發的 順便提一下 本人是位果粉,別噴~ 使用 MAC OS 安非常簡單,還我是建議使用 MAC OS 的同學使用 brew 進行開發,沒有別的 就是因為方便。

使用這個命令之前 如果沒有安裝過 Homebrew 就安裝一下吧 有機會我也分享一下安裝方式 順便介紹一下這個讓你懶癌晚期的Homebrew。

開啟終端輸入:

 brew install rabbitmq

然後會告訴你 無法獲得 /usr/local/sbin 的寫入許可權。 因為是英文的錯誤提示,本人會看不會寫 所以直接寫中文給各位看了~ 還是那句 別噴~ 謝謝
好,接下來讓他有許可權,其實這個目錄是沒有的sbin。

 mkdir /usr/local/sbin       建立目錄
 chmod 777 /usr/local/sbin   開許可權,全開別煩
 brew link rabbitmq          link 一下

然後就沒有然後了

啟動 rabbitMQ

MacBook-Pro:~ Tony$ /usr/local/sbin/rabbitmq-server 

              RabbitMQ 3.5.6. Copyright (C) 2007-2015 Pivotal Software, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
  ######  ##        /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
  ##########
              Starting broker... completed with 10 plugins.

目前為止已經安裝完 rabbitMQ 並且啟動起來 了 簡單到飛

開始廢話 說說概念,別方 我也是抄書的~

消費者和生產者,由於書本的篇幅太長,就抄了!簡單明瞭告訴你們,訊息的產生者就是生產者,獲得訊息的稱為訊息消費者。我QQ M了你一下 我就是訊息的生產者了,你在QQ看到我的訊息了,你就是訊息的消費者了,這才是正常寫書方式嘛··· 舉個開發的業務背景的例子,我支付了一張訂單,這個業務產生了幾個次要的業務,例如積分業務,我購買了訂單,訂單核心功能處理訂單的業務,然後生產了一個訊息並通過rabbitMQ 傳送了一個訊息,內容為產生了一個訂單,然後訂閱了訊息的訊息消費者就會消費訊息,然後根據訊息處理自己的業務,該幹嘛幹嘛去~ 積分業務就去加你的積分等等。當然這個只是舉例,別吐槽 可以在台伺服器或者一個業務方法上實現。 上面的舉例對於一些分散式的微服務上是非常常見的方式了~

幾個重要的概念:

通道:在連線到rabbitMQ 之後,你將可以建立並獲得一個AMQP通道,通道是建立在“真實的”TCP連線內的虛擬連結。所有的AMQP命令都是通過 AMQP 通道發出去的,每條通道會被指派一個唯一的ID。所有發布訊息還是訂閱都是通過AMQP 通道去處理的。這個概念非常像埠號【但是不是埠啊 ,別說我混淆你】,而且他的英文名字可以理解到他的意思channel 有使用NIO 的開發者已經比較了解~ 其實使用AMQP通道的原因非常簡單,建立TCP連線的效能代價是非常高的,當你在不同的執行緒當中去建立TCP連線的方式去和RabbitMQ進行通訊的代價是十分高的,但是在多個不同的執行緒當中通過channel 我們就可以通過同一個TCP連線進行通訊了。好像寫得好亂不知道他們理不理解我寫什麼,唉 算了 本人的水平有限,況且大多數情況下都是我自己看而已,放棄~ 抽象圖如下 醜,不過先將就~

交換器 與 路由鍵: 這個好難解釋啊~ 直接說工作流程吧~ 哎喲其實應該把 佇列寫到這裡來,算了吧 不改了。 先記住,交換器當中裡面有N個佇列,而路由鍵 是讓交換器知道訊息應該放到哪個佇列當中去。別廢話舉例: 產生訊息 要告訴 RabbitMQ 這個訊息是放到那個交換器上,【注意:交換器是由開發者去建立的你可以搞N個交換器出來,沒有管你】
上程式碼:
this.channel.basicPublish(“hello-exchange”,”hola”,properties, SerializationUtils.serialize(msg));
首先 hello-exchange 是交換器的名字, hola 是路由鍵 這個讓hello-exchange 去判斷應該分發到那個佇列當中,SerializationUtils.serialize(msg) 將一個訊息序列化。 channel 就不用說啦上面說了,是一個AMQP通道。
交換器有很多不同的型別一會會一一介紹。交換器的型別不同也註定了訊息會分發到哪個佇列當中,所有尤其重要。

工作流程:

1、消費者訂閱訊息,先建立一個交換器,如果交換器已經存在即返回一個交換器。當然你可以passive 為 true 如果 passive 為 true , 交換器已經存在返回一個已經存在的交換器,否則報錯。我暫時不知道用在哪裡,所以 先記住一般情況下 建立一個交換器,如果存在就返回一個已經存在的交換器,如果不存在則建立並返回。

JAVA 程式碼如下【直接給原始碼的方法定義,是不是很貼心】:

備註是自己翻譯的別噴

/**
     * Declare an exchange, via an interface that allows the complete set of
     * arguments.
     * @see com.rabbitmq.client.AMQP.Exchange.Declare
     * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
     * @param exchange the name of the exchange  【交換器名稱】
     * @param type the exchange type 【交換器型別 一會說 別著急】
     * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) 【是否持久化交換器,這個後面說,還是簡單說一下,糾結帝~ 就是交換器中的訊息會持久化,就是會存在硬碟當中,宕機或重新啟動服務時會自動恢復當時會犧牲效能,除非你的SSD硬碟好快,當我沒有說過~】
     * @param autoDelete true if the server should delete the exchange when it is no longer in use  【當這個交換器 已經沒有人使用的時候 會自動刪除,longer 長時期 當時我不知道長時期是什麼時候,所以我會記住是 沒人用就自己刪掉 自動消失】
     * @param internal true if the exchange is internal, i.e. can't be directly
     * published to by a client. 【不知道什麼鬼意思,如果是內部 不會直接傳送到用戶端,我一般寫false ,真心不知道他什麼意思,後面學下去應該明白】
     * @param arguments other properties (construction arguments) for the exchange 【其他引數 暫時沒有用到】
     * @return a declaration-confirm method to indicate the exchange was successfully declared  
     * @throws java.io.IOException if an error is encountered 【會丟擲IO異常】
     */
Exchange.DeclareOk exchangeDeclare(String exchange,String type, boolean durable, boolean autoDelete, boolean internal,Map<String, Object> arguments) throws IOException;

2、消費者建立佇列,並繫結佇列到交換器當中,上寫路由鍵,讓交換器知道這個路由鍵的訊息是跑到這個佇列當中的。

JAVA 程式碼如下【直接給原始碼的方法定義】:

/**
     * Declare a queue
     * @see com.rabbitmq.client.AMQP.Queue.Declare
     * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
     * @param queue the name of the queue  【佇列名稱】 
     * @param durable true if we are declaring a durable queue (the queue will survive a server restart)  【持久化,這裡特別說一下,如果你想訊息是持久化的,必須訊息是持久化的,交換器也是持久化的,佇列更是持久化的,其中一個不是也無法恢復訊息】
     * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)  【私有的,獨有的。 這個佇列之後這個應用可以消費,上面的英文注釋是 說restricted to this connection  就是限制在這個連線可以消費,就是說不限制channel通道咯,具體沒有試過,但是應該是這樣,除非備註騙我,我讀得書少,你唔好呃我!!!】 
     * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) 【沒有人使用自動刪除】 注意:如果exclusive為true 最好 autodelete都為true 至於為什麼 這麼簡單自己想~
     * @param arguments other properties (construction arguments) for the queue 【其他引數沒有玩過】
     * @return a declaration-confirm method to indicate the queue was successfully declared  
     * @throws java.io.IOException if an error is encountered
     */
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;/**
     * Declare a queue
     * @see com.rabbitmq.client.AMQP.Queue.Declare
     * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
     * @param queue the name of the queue
     * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
     * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
     * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
     * @param arguments other properties (construction arguments) for the queue
     * @return a declaration-confirm method to indicate the queue was successfully declared
     * @throws java.io.IOException if an error is encountered
     */
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
係結佇列和交換器 並指定路由鍵
/**
     * Bind a queue to an exchange.
     * @see com.rabbitmq.client.AMQP.Queue.Bind
     * @see com.rabbitmq.client.AMQP.Queue.BindOk
     * @param queue the name of the queue  【佇列名稱】
     * @param exchange the name of the exchange 【交換器名稱】
     * @param routingKey the routine key to use for the binding 【路由鍵】
     * @param arguments other properties (binding parameters) 【其他引數 還是那句沒有玩過】
     * @return a binding-confirm method if the binding was successfully created
     * @throws java.io.IOException if an error is encountered
     */
    Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

3、然後消費者就去訂閱訊息咯,注意在JAVA裡面的訂閱方法會產生執行緒阻塞的。

JAVA 程式碼:

訂閱訊息並消費
 /**
     * Start a non-nolocal, non-exclusive consumer, with
     * a server-generated consumerTag.
     * @param queue the name of the queue 【所訂閱消費的佇列】
     * @param autoAck true if the server should consider messages 【是否為自動確定訊息,好像TCP的ack syn 啊,可怕的7層模型!!!一般寫true就可以】
     * acknowledged once delivered; false if the server should expect
     * explicit acknowledgements
     * @param callback an interface to the consumer object  【回撥callback 這個馬上上程式碼看看】
     * @return the consumerTag generated by the server
     * @throws java.io.IOException if an error is encountered
     * @see com.rabbitmq.client.AMQP.Basic.Consume
     * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
     * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
     */
    String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

接收到訊息之後,馬上有回撥,來看看回撥介面:

/**
 * Interface for application callback objects to receive notifications and messages from
 * a queue by subscription.
 * Most implementations will subclass {@link DefaultConsumer}.
 * <p/>
 * The methods of this interface are invoked in a dispatch
 * thread which is separate from the {@link Connection}'s thread. This
 * allows {@link Consumer}s to call {@link Channel} or {@link
 * Connection} methods without causing a deadlock.
 * <p/>
 * The {@link Consumer}s on a particular {@link Channel} are invoked serially on one or more
 * dispatch threads. {@link Consumer}s should avoid executing long-running code
 * because this will delay dispatch of messages to other {@link Consumer}s on the same
 * {@link Channel}.
 *
 * @see Channel#basicConsume(String, boolean, String, boolean, boolean, java.util.Map, Consumer)
 * @see Channel#basicCancel
 */
public interface Consumer {
    /**
     * Called when the consumer is registered by a call to any of the
     * {@link Channel#basicConsume} methods.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     */
    void handleConsumeOk(String consumerTag);

    /**
     * Called when the consumer is cancelled by a call to {@link Channel#basicCancel}.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     */
    void handleCancelOk(String consumerTag);

    /**
     * Called when the consumer is cancelled for reasons <i>other than</i> by a call to
     * {@link Channel#basicCancel}. For example, the queue has been deleted.
     * See {@link #handleCancelOk} for notification of consumer
     * cancellation due to {@link Channel#basicCancel}.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     * @throws IOException
     */
    void handleCancel(String consumerTag) throws IOException;

    /**
     * Called when either the channel or the underlying connection has been shut down.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     * @param sig a {@link ShutdownSignalException} indicating the reason for the shut down
     */
    void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);

    /**
     * Called when a <code><b>basic.recover-ok</b></code> is received
     * in reply to a <code><b>basic.recover</b></code>. All messages
     * received before this is invoked that haven't been <i>ack</i>'ed will be
     * re-delivered. All messages received afterwards won't be.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     */
    void handleRecoverOk(String consumerTag);

    /**
     * Called when a <code><b>basic.deliver</b></code> is received for this consumer.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     * @param envelope packaging data for the message
     * @param properties content header data for the message
     * @param body the message body (opaque, client-specific byte array)
     * @throws IOException if the consumer encounters an I/O error while processing the message
     * @see Envelope  寶寶累了 看重點,這個方法就是訊息到達的時候回撥的方法其他你們喜歡研究 可以認真看看英文備註。
     */
    void handleDelivery(String consumerTag,
                        Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte[] body)
        throws IOException;
}

注意一下:其實也可以使用其他訂閱方式去消費訊息的,可以使用get的方式去獲得一條訊息,然後會在佇列當中給你一條訊息,但是你會想就不需要使用上面的訂閱方法啦,直接跑個迴圈就可以啦。其實get方法會開啟訂閱獲得一條訊息後關閉訂閱,這樣會產生不必要的效能開銷的,除非老闆讓你搞卡一點,讓客戶掏點優化費,不然就別這樣幹了。get的方法這裡就先不介紹啦~

4、上面所說的都是訊息消費者的工作,這個step開始說訊息生產者要做的。開啟一個交換器,當然這個交換器應該是存在不用建立的因為 上面所說消費者已經建立過了,但是其實誰去先建立都是可以的,消費者也可以建立,生產者也可以,這個倒是沒有什麼關係,主要是看你的業務需求【這句話甩鍋用】。由於已經貼出過建立交換器的程式碼所以就不貼了,看上面。

與上面的交換器建立一樣。不貼程式碼了自己拖上去看吧~

5、好像沒有什麼別的事情了,就是產生一個訊息然後發過去。上程式碼

JAVA程式碼:

/**
     * Publish a message
     * @see com.rabbitmq.client.AMQP.Basic.Publish
     * @param exchange the exchange to publish the message to  【傳送到那個交換器上】
     * @param routingKey the routing key  【路由鍵 決定訊息去哪個佇列裡面混】
     * @param props other properties for the message - routing headers etc  【其他訊息引數, 哎~ 這個我玩過,一會DEMO時間,表演一下】
     * @param body the message body 【訊息體,一般把物件序列化一下發過去】
     * @throws java.io.IOException if an error is encountered
     */
    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

然後就沒有然後了 上面 第三步的 訂閱方法中的回撥 就會有這個訊息了。 好又到說概念的時候了。 困的同學先休息。

訊息的投遞方式,我也睏了其實:

【1】 訊息MESSAGE_A 到達 queue_A 佇列
【2】 RabbitMQ 把 MESSAGE_A 傳送到 APPLICATION A(消費者)
【3】 APPLICATION A 確定收到了訊息 發個ack 高興一下
【4】 RabbitMQ 把 訊息從 queue_A 佇列刪除。

上面其實也有說過,autoack引數 有印象吧~ 所以autoack 設定成true 就沒錯了。這裡就可以開展其他話題了,如果多個訂閱者怎麼辦呢,是不是多個訂閱者都會收到訊息呢,答案是否定的。RabbitMQ的佇列會對訂閱者做一個迴圈,例如目前有兩個訂閱者訂閱了同一個佇列,serverA 和 serverB 他會迴圈去傳送訊息,佇列有 ID 1-10 的訊息,ID 1訊息會由 serverA 處理 ID 2 訊息會由 serverB 處理 ID 3 訊息會由 serverA 去處理 如此類推。
如果serverA 在傳送ACK應答給 RabbitMQ之前 斷開連線【就是服務掛了,宕機了】RabbitMQ 會認為這個訊息沒有處理,即沒有消費到這個訊息,會把這個訊息傳送到 serverB 去處理。
而且在訊息沒有ACK之前,RabbitMQ不會發你第二條訊息的,所以如果你想等待訊息的任務處理完之後再給第二個訊息的話,可以將autoDelete設定成false,這樣你就可以在訊息處理完之後再去ack。???樣也是一個非常通用的場景,以防扎堆處理訊息。
其實這樣的設計也是非常好的,如果你出現了業務錯誤,執行訊息處理的時候,剛好出現問題了,你可以通過斷開連線的方式【這裡所指的斷開是斷開RabbitMQ的connection】讓這個訊息交給下個訂閱者,【這裡說一下,如果佇列沒有消費者去訂閱訊息的話,訊息會存在RabbitMQ當中等待有消費者去訂閱再去傳送】,當然這也是在你沒有ack的情況下。在高階版本的RabbitMQ中可以使用reject命令,讓這個訊息直接傳遞到下個訂閱者。【在RabbitMQ2.0可以使用】

AMQP通道channel與訂閱

如果 一個channel 訂閱了一個佇列就不能訂閱別的佇列了,也就是說一個channel只能訂閱一個佇列。所以你需要取消原來的訂閱,並將通道設定為“傳輸”模式。後面有時間寫後面的文章可能會說到。

交換器的型別:

direct :這個簡單,只要路由鍵一模一樣就投遞到相應的佇列當中。


fanout :這個也簡單,訊息會投遞到所有系結到這個交換器的佇列當中。


topic :這個就複雜一點了,我很客觀的簡單就簡單 複雜就複雜 從不忽悠。但是這個也好常用不能不學,好累。在路由鍵當中可以使用萬用字元。怎麼說呢,好糾結啊 不知道怎麼解釋啊 怎麼辦 線上等。

舉個例子吧,係結佇列:

//係結佇列 路由鍵是 tony.teamA 有路由鍵為tony.teamA的訊息就投遞到這裡
this.channel.queueBind("queueA","topic_exchangeA","tony.teamA",null); 

//係結佇列 路由鍵是 yan.teamA 有路由鍵為tony.teamB的訊息就投遞到這裡
this.channel.queueBind("queueB","topic_exchangeA","yan.teamA",null); 

//係結佇列 路由鍵是 *.teamA  有路由鍵為.teamA結尾的訊息就投遞到這裡
this.channel.queueBind("queueC","topic_exchangeA","*.teamA",null); 

//係結佇列 路由鍵是 chao.teamB 有路由鍵為chao.teamB的訊息就投遞到這裡
this.channel.queueBind("queueD","topic_exchangeA","chao.teamB",null); 

//係結佇列 路由鍵是 *.teamB 有路由鍵為.teamB結尾的訊息就投遞到這裡
this.channel.queueBind("queueE","topic_exchangeA","*.teamB",null); 

//係結佇列 路由鍵是 # 所有在這個交換器的訊息都投遞到這裡
this.channel.queueBind("queueF","topic_exchangeA","#",null); 

// queueA[路由鍵:tony.teamA]、queueC[路由鍵:*.teamA]、queueF[路由鍵:#]會收到訊息
this.channel.basicPublish("topic_exchangeA","tony.teamA",properties,SerializationUtils.serialize(msg));

// queueD[路由鍵:chao.teamB]、queueE[路由鍵:*.teamB]、queueF[路由鍵:#]會收到訊息
this.channel.basicPublish("topic_exchangeA","chao.teamB",properties,SerializationUtils.serialize(msg));

然後基本的rabbitMQ 的東西就搞好了。不過這只是開始~

簡單說一下虛擬主機和隔離吧,簡單來說想MySQL 你可以建立很多個庫,裡面有很多個表。然後呢 rabbitMQ可以建立很多個虛擬主機,虛擬主機裡面有很多個交換器,交換器裡面有很多個佇列,解釋得完美。預設會提供一個 預設虛擬主機 vhost : “/”。後面找時間再說這個 vhost 吧。

重頭大戲上DEMO,由於方便我閱讀回憶,所以我忽略的封裝性,一切以容易快速看懂和回憶為目標。別噴~

生產者:

package com.maxfunner;

import com.maxfunner.mq.EndPoint;
import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Producer
 */
public class Producer {


    private Connection connection;
    private Channel channel;
    private Map<Long, String> messageMap = new HashMap<Long, String>();
    private int maxID = 0;

    private static final String EXCHANGE_NAME = "MY_EXCHANGE";


    public void createConnectionAndChannel() throws IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");  //伺服器地址
        factory.setUsername("guest");  //預設使用者名稱
        factory.setPassword("guest"); //預設密碼
        factory.setPort(5672); //預設埠,對就是這麼屌全部預設的

        this.connection = factory.newConnection();  //建立連結

        this.channel = this.connection.createChannel();

    }


    public void initChannelAndCreateExchange() throws IOException {

        this.channel.confirmSelect();   //啟用訊息確認已經投遞成功的回撥


        /**
         * 建立了一個交換器,型別為 direct 非持久化 自動刪除  沒有額外引數
         */
        this.channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, true, null);

        this.channel.addConfirmListener(new ConfirmListener() {


            /**
             * 成功的時候回撥【這個是當訊息到達交換器的時候回撥】
             * @param deliveryTag   每一條訊息都有一個唯一ID【只是同一個channel唯一】,每次發出訊息遞增1 因為同一個channel所有也保證了訊息的流水性。
             * @param multiple
             * @throws IOException
             */
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {

                String message = messageMap.get(deliveryTag);
                System.out.println("message : " + message + " ! 傳送成功");
                messageMap.remove(message);

                //最後一個訊息都搞掂之後 關閉所有東西
                if (deliveryTag >= maxID) {
                    closeAnything();
                }

            }

            /**
             * 失敗的時候回撥
             * @param deliveryTag   每一條訊息都有一個唯一ID【只是同一個channel唯一】,每次發出訊息遞增1 因為同一個channel所有也保證了訊息的流水性。
             * @param multiple
             * @throws IOException
             */
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {

                String message = messageMap.get(deliveryTag);
                System.out.println("message : " + message + " ! 傳送失敗");
                messageMap.remove(message); //傳送失敗就不重發了,發脾氣

                //最後一個訊息都搞掂之後 關閉所有東西
                if (deliveryTag >= maxID) {
                    closeAnything();
                }

            }
        });

    }


    public void sendMessage(String message) throws IOException {


        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .contentType("text/plain")  //指定是一個文字
                .build();


        // 傳送一個訊息 到 EXCHANGE_NAME 的交換器中  路由鍵為 KEY_A  傳送 message 之前序列化一下 具體用什麼包上面import自己看
        this.channel.basicPublish(EXCHANGE_NAME, "KEY_A", properties, SerializationUtils.serialize(message));

    }

    public void closeAnything() throws IOException {
        this.channel.close();   //跪安吧 小channel
        this.connection.close(); //你也滾吧 connection
    }


    public static void main(String[] args) throws IOException {


        Producer producer = new Producer();
        producer.createConnectionAndChannel();
        producer.initChannelAndCreateExchange();


        List<String> messageList = new ArrayList<String>();
        messageList.add("message_A");
        messageList.add("message_B");
        messageList.add("message_C");
        messageList.add("message_D");
        messageList.add("message_E");
        messageList.add("message_F");


        producer.maxID = messageList.size();    //記錄最後一個ID 當最後一個訊息傳送成功後關閉連線

        //注意:因為channel產生的ID 是從1開始的
        for (int i = 1; i <= messageList.size(); i++) {

            producer.messageMap.put(new Long(i), messageList.get(i - 1));    //這裡看懂了嗎?沒看懂也沒有辦法了,這裡我真不知道怎麼解釋
            producer.sendMessage(messageList.get(i - 1));

        }


    }
}

消費者:

package com.maxfunner;

import com.maxfunner.mq.QueueConsumer;
import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * Consumer
 */
public class Consumer {

    private Connection connection;
    private Channel channel;
    private Map<Integer,String> messageMap = new HashMap<Integer, String>();

    private static final String EXCHANGE_NAME = "MY_EXCHANGE";


    /**
     * 對,你猜得一點都沒有錯,我是複製的
     * @throws IOException
     */
    public void createConnectionAndChannel() throws IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");  //伺服器地址
        factory.setUsername("guest");  //預設使用者名稱
        factory.setPassword("guest"); //預設密碼
        factory.setPort(5672); //預設埠,對就是這麼屌全部預設的

        this.connection = factory.newConnection();  //建立連結

        this.channel = this.connection.createChannel();

    }


    public void createAndBindQueue() throws IOException {

        /**
         * 建立了一個交換器,型別為 direct 非持久化 自動刪除  沒有額外引數
         */
        this.channel.exchangeDeclare(EXCHANGE_NAME,"direct",false,true,null); //最好也建立一下交換器,反正已經建立也沒有關係

        /**
         * 建立了一個佇列, 名稱為 QUEUE_A  非持久化 非獨有的 自動刪除的 沒有額外刪除的
         */
        this.channel.queueDeclare("QUEUE_A",false,false,true,null);


        this.channel.queueBind("QUEUE_A",EXCHANGE_NAME,"KEY_A");


    }



    public static void main(String args[]) throws IOException {


        final Consumer consumer = new Consumer();

        consumer.createConnectionAndChannel();

        consumer.createAndBindQueue();

        System.out.println("等待訊息中。。。。");

        new Thread(new Runnable() {
            public void run() {

                try {

                    /**
                     * 訂閱訊息,訂閱佇列QUEUE_A  獲得訊息後自動確認
                     */
                    consumer.channel.basicConsume("QUEUE_A", true, new com.rabbitmq.client.Consumer() {

                        public void handleConsumeOk(String consumerTag) {

                        }

                        public void handleCancelOk(String consumerTag) {

                        }

                        public void handleCancel(String consumerTag) throws IOException {

                        }

                        public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

                        }

                        public void handleRecoverOk(String consumerTag) {

                        }

                        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                            System.out.println("message : " + SerializationUtils.deserialize(body));

                        }
                    });
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();


    }

}

生產者執行結果:

message : message_A ! 傳送成功
message : message_B ! 傳送成功
message : message_C ! 傳送成功
message : message_D ! 傳送成功
message : message_E ! 傳送成功
message : message_F ! 傳送成功

消費者執行結果:

等待訊息中。。。。
message : message_A
message : message_B
message : message_C
message : message_D
message : message_E
message : message_F

專案我是用maven建立的貼一下maven 的pom.xml檔案

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.maxfunner</groupId>
    <artifactId>rabbitlearning</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>rabbitlearning</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>


        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.0.4</version>
        </dependency>

        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.1</version>
        </dependency>

    </dependencies>
</project>

最後一個重點問題

訊息“黑洞”,如果在沒有系結佇列和交換器之前,所有發出的訊息都無法匹配到相應的佇列當中,那些訊息將永遠不會被消費。而且confirm的callback也是會返回成功的即使訊息進入了訊息“黑洞”。所以在傳送訊息之前 必須確定佇列已經系結,確保訊息能分配到相應的佇列當中。 測試很簡單,上面的DEMO 先執行 Producer 顯示所有訊息傳送成功,然後再執行 Consumer 發現沒有訊息可以接收。 再嘗試先執行Consumer 再執行 Producer 就發現一切都正常了,這也是為什麼我把autoDelete設定為true的原因,有了autoDelete當佇列沒有人用的時候就會自動刪除。所以每次執行都可以測試出問題。要保證訊息能夠到達指定的佇列最好也在Producer中建立佇列 而且進行相關的系結 然後再傳送訊息 修改一下 Producer 的其中一方法即可。 不寫了 累了~

public void initChannelAndCreateExchange() throws IOException {

        this.channel.confirmSelect();   //啟用訊息確認已經投遞成功的回撥


        /**
         * 建立了一個交換器,型別為 direct 非持久化 自動刪除  沒有額外引數
         */
        this.channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, true, null);

        this.channel.addConfirmListener(new ConfirmListener() {


            /**
             * 成功的時候回撥【這個是當訊息到達交換器的時候回撥】
             * @param deliveryTag   每一條訊息都有一個唯一ID【只是同一個channel唯一】,每次發出訊息遞增1 因為同一個channel所有也保證了訊息的流水性。
             * @param multiple
             * @throws IOException
             */
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {

                String message = messageMap.get(deliveryTag);
                System.out.println("message : " + message + " ! 傳送成功");
                messageMap.remove(message);

                //最後一個訊息都搞掂之後 關閉所有東西
                if (deliveryTag >= maxID) {
                    closeAnything();
                }

            }

            /**
             * 失敗的時候回撥
             * @param deliveryTag   每一條訊息都有一個唯一ID【只是同一個channel唯一】,每次發出訊息遞增1 因為同一個channel所有也保證了訊息的流水性。
             * @param multiple
             * @throws IOException
             */
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {

                String message = messageMap.get(deliveryTag);
                System.out.println("message : " + message + " ! 傳送失敗");
                messageMap.remove(message); //傳送失敗就不重發了,發脾氣

                //最後一個訊息都搞掂之後 關閉所有東西
                if (deliveryTag >= maxID) {
                    closeAnything();
                }

            }
        });

        /**
         * 建立了一個佇列, 名稱為 QUEUE_A  非持久化 非獨有的 自動刪除的 沒有額外刪除的
         */
        this.channel.queueDeclare("QUEUE_A",false,false,true,null);


        this.channel.queueBind("QUEUE_A",EXCHANGE_NAME,"KEY_A");

 

CentOS7下安裝RabbitMQ  http://www.linuxidc.com/Linux/2016-11/136812.htm

CentOS 7.2 下 RabbitMQ 叢集搭建  http://www.linuxidc.com/Linux/2016-12/137812.htm

CentOS7環境安裝使用專業的訊息佇列產品RabbitMQ http://www.linuxidc.com/Linux/2016-11/13673.htm

設定與管理RabbitMQ  http://www.linuxidc.com/Linux/2016-11/136815.htm

RabbitMQ概念及環境搭建 http://www.linuxidc.com/Linux/2014-12/110449.htm

RabbitMQ入門教學  http://www.linuxidc.com/Linux/2015-02/113983.htm


IT145.com E-mail:sddin#qq.com