2021-05-12 14:32:11
RabbitMQ入門教學
前面宣告本文都是RabbitMQ的官方指南翻譯過來的,由於本人水平有限難免有翻譯不當的地方,如發現不對的地方,請聯絡下我,好及時改正。好了,正文開始:
RabbitMQ 是一個訊息代理。這主要的原理十分簡單,就是通過接受和轉發訊息。你可以把它想象成郵局:當你將一個包裹送到郵局,你會相信郵遞員先生最終會將郵件送到接件人手上。RabbitMQ就好比一個郵箱,郵局或郵遞員。
郵局和RabbitMQ兩種主要的不同之處在於,RabbitMQ不處理檔案,而是接受,並儲存和以二進位制形式將訊息轉發。
RabbitMQ,在訊息的傳送過程中,我們使用一些標準稱呼。
生產過程就像傳送過程,傳送訊息的程式就是一個生產者,我們使用“P”來描述它。
佇列是好比郵筒的稱呼,它位於RabbitMQ內部,雖然訊息流通過RabbitMQ和你的應用程式,但是它們僅僅儲存在佇列中。一個佇列沒有範圍限制,你可以想儲存多少就儲存多少,本質上來說它是無限大的快取。多個生產者可以通過一個佇列傳送訊息,同樣多個消費者也可以通同一個訊息佇列中接收訊息。佇列是畫成這樣,名字在它的上面:
消費過程與接收相似,一個消費者通常是一個等著接受訊息的程式,我們使用"C"來描述:
注意,那生產者,消費者和代理者不需要一定在一個機器上,事實上,大多數應用程式中,他們並不在一個機器上。
“Hello World”
(使用Java用戶端)
在這部分指南中,我們將要使用java寫兩個程式;一個傳送簡單訊息的生產者和一個接收訊息並輸出出來的消費者。我們會忽視掉一些Java API的細節,為了開始僅僅精選在這簡單的事情上,這是一個"Hello World"訊息。
Java 用戶端庫
RabbitMQ 遵循AMQP協定,那是一個開放的,並且通用的訊息協定。在不同語言中有數種AMQP用戶端,我們使用由RabbitMQ提供的Java用戶端。
下載用戶端庫包,檢驗簽名,將它解壓縮到你的工作路徑,從解壓到的路徑中提取JAR檔案:$ unzip rabbitmq-java-client-bin-*.zip $ cp rabbitmq-java-client-bin-*/*.jar ./
(RabbitMQ Java用戶端也存在Maven中央庫中,
groupId
是com.rabbitmq
,artifactId
是amqp-client
.)現在我們已經有了Java用戶端和依賴檔案,我們可以寫一些程式碼了。
傳送
我們將會讓我們的訊息傳送者傳送訊息,我們的接收者接收訊息。傳送者連線到RabbitMQ上,傳送一個簡單的訊息,然後退出。
在Send.java
,我們需要引入一些類:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
建立這個類,為佇列命名:
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException {
...
}
}
接著,我們建立一個伺服器的連線:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
抽象的socket連線,注意協定版本的處理以及授權,諸如此類的事情。
這裡我們連線到本地機器上的代理,因此它是localhost
。如果我們想連線到不同機器上的代理,只需要說明它的主機名和IP地址。
接下來我們建立一個通道,獲取操作的大多數API都位於這上。
對於傳送,我們必須宣告一個傳送佇列,然後我們把訊息傳送到這個佇列上:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
Declaring a queue is idempotent - it will only be created if it doesn't exist already. The message content is a byte array, so you can encode whatever you like there.
Lastly, we close the channel and the connection;
宣告一個佇列是冪等的,僅僅在要宣告的佇列不存在時才建立。訊息內容是二進位制陣列,所以你可以隨你喜好編碼。
channel.close();
connection.close();
Here's the whole Send.java class.
傳送沒有起作用
如果你是第一次使用RabbitMQ並且你沒有看到"Sent"訊息,你可能抓耳撓腮的想到底是哪裡出的問題。可能是代理啟動時沒有足夠空間(預設它需要至少1Gb 空間),因此拒絕接受訊息。通過檢查代理的紀錄檔檔案來確定這個問題,必要情況下可以降低限制大小。組態檔的文件將會告訴你怎樣設定
disk_free_limit
。
接收
上面程式碼是構建我們的傳送者。我們的接收者是從RabbitMQ中提取訊息,所以不像傳送者那樣傳送一個簡單的訊息,我們需要一直執行監聽訊息並且輸出訊息。
在Recv.java中的程式碼有與Send中幾乎相同的參照:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
這額外的QueueingConsumer
類是用來快取從伺服器那裡發出來的資訊。
跟建立傳送者相同,我們開啟一個連線和一個通道,宣告一個我們要消費的佇列。注意要與傳送的佇列相匹配。
java.lang.InterruptedException {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
...
}
}
注意我們在這裡同樣宣告了一個佇列。以為我們可能在傳送者之前啟動接收者,在我們從中獲取訊息之前我們想要確定這佇列是否真實存在。
我們通知伺服器通過此佇列給我們傳送訊息。因此伺服器會非同步的給我們推播訊息,在這裡我們提供一個回撥物件用來快取訊息,直到我們準備好再使用它們。這就是QueueingConsumer
所做的事。
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
QueueingConsumer.nextDelivery()
在另一個來自伺服器的訊息到來之前它會一直阻塞著。
這是整個Recv.java類。
把所有放在一起
你可以在RabbitMQ Java用戶端的類路徑上編譯這些檔案:
$ javac -cp rabbitmq-client.jar Send.java Recv.java
為了執行它們,你需要rabbitma-client.jar
和它在類路徑上的的依賴檔案。在一個終端上,執行傳送者:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send
然後,執行接收者:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv
在windows環境中,我們使用分號代替冒號來分隔類路徑上的選項。
接收者將會輸出從RabbitMQ中獲取到來自傳送者的訊息。接收者會一直保持執行,等待訊息(使用Ctrl-C
停止),所以試著用另一個終端執行傳送者。
如果你想檢驗佇列,試著使用rabbitmqctl list_queues0
。
Hello World!
時間移動到第二部分,構建一個簡單的工作佇列。
提示
為了儲存輸入,你可以將類路徑設定到環境變數中$ export CP=.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar $ java -cp $CP Send
或者在 Windows環境中:
> set CP=.;commons-io-1.2.jar;commons-cli-1.1.jar;rabbitmq-client.jar > java -cp %CP% Send
CentOS 5.6 安裝RabbitMQ http://www.linuxidc.com/Linux/2013-02/79508.htm
RabbitMQ用戶端C++安裝詳細記錄 http://www.linuxidc.com/Linux/2012-02/53521.htm
用Python嘗試RabbitMQ http://www.linuxidc.com/Linux/2011-12/50653.htm
RabbitMQ叢集環境生產範例部署 http://www.linuxidc.com/Linux/2012-10/72720.htm
Ubuntu下PHP + RabbitMQ使用 http://www.linuxidc.com/Linux/2010-07/27309.htm
在CentOS上安裝RabbitMQ流程 http://www.linuxidc.com/Linux/2011-12/49610.htm
RabbitMQ概念及環境搭建 http://www.linuxidc.com/Linux/2014-12/110449.htm
更多詳情見請繼續閱讀下一頁的精彩內容: http://www.linuxidc.com/Linux/2015-02/113983p2.htm
相關文章