<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
之前專欄中,對Springboot
整合Rabbitmq
都有一系列的設定和說明,但總缺少一些必要的描述資訊。導致很多看部落格的小夥伴會私信問為什麼需要這麼設定的問題。
本篇部落格重點進行Confirm 機制
和Return 機制
的實現和說明。
RabbitMq
中,針對資料由訊息生產者
向訊息佇列
推播時,通常情況如下所示(以 Routing 方式為例):
每個Virtual Host 虛擬機器器
中,都會含有各自的Exchange
和Queue
,需要在rabbitmq web
介面中針對可以存取該Virtual Host 虛擬機器器
的使用者進行設定。
有點類似資料庫的概念,指定使用者只能操作指定的資料庫。
在使用交換機 Exchange
時,訊息生產者需要將訊息通過Channel 管道
將資料傳送給MQ
,但想過一個問題沒有:
如何 確定 訊息是否真的傳送到了指定的 MQ 中呢?
MQ
中,對此問題,提出有Confirm 機制
,對其傳送資料進行監聽,讓訊息傳送者知道訊息的傳送結果。
開發測試主要的SpringBoot 版本為2.1.4.RELEASE
。
此時只需要引入指定的amqp
依賴即可:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
完整的pom依賴如下所示:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>springboot-rabbitmq</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- 引入rabbitmq依賴 --> <artifactId>spring-boot-starter-amqp</artifactId> <artifactId>spring-boot-starter-web</artifactId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.20</version> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.26</version> <artifactId>slf4j-log4j12</artifactId> </dependencies> </project>
增加組態檔,設定使用具體的Virtual Host
、Username
、Password
、Host
、Port
等資訊。
server: port: 80 spring: rabbitmq: host: xxxxxx port: 5672 username: xiangjiao password: bunana virtual-host: /xiangjiao publisher-confirms: true #訊息傳送到轉發器確認機制,是都確認回撥 publisher-returns: true
指定交換機名稱為:xiangjiao.exchange
。
佇列名稱為:xiangjiao.queue
。
使用Direct 直連
模式,其中關聯的Routingkey
為:xiangjiao.routingKey
。
package cn.linkpower.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MQConfiguration { //佇列名稱 public static final String QUEUQ_NAME = "xiangjiao.queue"; //交換器名稱 public static final String EXCHANGE = "xiangjiao.exchange"; //路由key public static final String ROUTING_KEY = "xiangjiao.routingKey"; //建立佇列 @Bean public Queue getQueue(){ // 另一種方式 //QueueBuilder.durable(QUEUQ_NAME).build(); return new Queue(QUEUQ_NAME); } //範例化交換機 @Bean public DirectExchange getDirectExchange(){ //DirectExchange(String name, boolean durable, boolean autoDelete) // 另一種方式: //ExchangeBuilder.directExchange(EXCHANGE).durable(true).build(); /** * 引數一:交換機名稱;<br> * 引數二:是否永久;<br> * 引數三:是否自動刪除;<br> */ return new DirectExchange(EXCHANGE, true, false); //繫結訊息佇列和交換機 public Binding bindExchangeAndQueue(DirectExchange exchange,Queue queue){ // 將 建立的 queue 和 exchange 進行繫結 return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); }
在Springboot
中,針對MQ
訊息的傳送,採取RabbitTemplate
模板進行資料的傳送處理操作。
手動定義訊息傳送處理類
,對其RabbitTemplate
進行其他設定。
package cn.linkpower.service; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Slf4j @Component public class RabbitmqService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String exchange,String routingKey,Object msg) { // 設定交換機處理失敗訊息的模式 true 表示訊息由交換機 到達不了佇列時,會將訊息重新返回給生產者 // 如果不設定這個指令,則交換機向佇列推播訊息失敗後,不會觸發 setReturnCallback rabbitTemplate.setMandatory(true); //訊息消費者確認收到訊息後,手動ack回執 rabbitTemplate.setConfirmCallback(this); // 暫時關閉 return 設定 //rabbitTemplate.setReturnCallback(this); //傳送訊息 rabbitTemplate.convertAndSend(exchange,routingKey,msg); } /** * 交換機並未將資料丟入指定的佇列中時,觸發 * channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes()); * 引數三:true 表示如果訊息無法正常投遞,則return給生產者 ;false 表示直接丟棄 * @param message 訊息物件 * @param replyCode 錯誤碼 * @param replyText 錯誤資訊 * @param exchange 交換機 * @param routingKey 路由鍵 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" "); * 訊息生產者傳送訊息至交換機時觸發,用於判斷交換機是否成功收到訊息 * @param correlationData 相關設定資訊 * @param ack exchange 交換機,判斷交換機是否成功收到訊息 true 表示交換機收到 * @param cause 失敗原因 public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("---- confirm ----ack="+ack+" cause="+String.valueOf(cause)); log.info("correlationData -->"+correlationData.toString()); if(ack){ // 交換機接收到 log.info("---- confirm ----ack==true cause="+cause); }else{ // 沒有接收到 log.info("---- confirm ----ack==false cause="+cause); } }
編寫一個Controller
,將產生的資料,通過自定義的RabbitmqService
傳送至指定的Exchange交換機
中。
package cn.linkpower.controller; import cn.linkpower.config.MQConfiguration; import cn.linkpower.service.RabbitmqService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class SendMessageTx { @Autowired private RabbitmqService rabbitmqService; @RequestMapping("/sendMoreMsgTx") @ResponseBody public String sendMoreMsgTx(){ //傳送10條訊息 for (int i = 0; i < 10; i++) { String msg = "msg"+i; System.out.println("傳送訊息 msg:"+msg); // xiangjiao.exchange 交換機 // xiangjiao.routingKey 佇列 rabbitmqService.sendMessage(MQConfiguration.EXCHANGE, MQConfiguration.ROUTING_KEY, msg); //每兩秒傳送一次 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } return "send ok"; } }
http://localhost/sendMoreMsgTx
從控制檯中可以看到訊息資訊如下所示:
發現,訊息資訊傳送,都是ACK 被確認
的!
異常測試,首先需要保證mq服務中沒有對應的exchange交換機。還需要保證訊息的傳送者exchange資訊修改。
將controller中對應的訊息傳送的方式修改如下:
rabbitmqService.sendMessage("xiangjiao.exchangeError", MQConfiguration.ROUTING_KEY, msg);
重啟專案,重新請求該介面,觀察控制檯資料資訊展示:
擷取其中的一條資訊為例:
傳送訊息 msg:msg0
2022-02-28 10:34:58.686 ---- [rabbitConnectionFactory1] ---- INFO cn.linkpower.service.RabbitmqService - ---- confirm ----ack=false
cause=channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND -
no exchange 'xiangjiao.exchangeError' in vhost '/xiangjiao', class-id=60, method-id=40)
當生產者
向Exchange
中傳送訊息,如果訊息並未成功傳送,則會觸發RabbitmqService
中設定的confirm
處理機制。
rabbitTemplate.setConfirmCallback(this); /** * 訊息生產者傳送訊息至交換機時觸發,用於判斷交換機是否成功收到訊息 * @param correlationData 相關設定資訊 * @param ack exchange 交換機,判斷交換機是否成功收到訊息 true 表示交換機收到 * @param cause 失敗原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("---- confirm ----ack="+ack+" cause="+String.valueOf(cause)); log.info("correlationData -->"+correlationData.toString()); if(ack){ // 交換機接收到 log.info("---- confirm ----ack==true cause="+cause); }else{ // 沒有接收到 log.info("---- confirm ----ack==false cause="+cause); } }
上面的設定中,採取Confirm機制
,能夠更好的保證訊息生產者確認訊息是否正常到達Exchange中
。
但是,在MQ
中,由於使用Exchange
和Queue
進行了繫結,
如果某個佇列宕機了,Exchange並
未將訊息傳送
匹配 Routing Key 的佇列,那麼訊息就不能到達佇列中!!!
mq
中,對此情況設有另外一種監聽機制:Return
機制!
當訊息
由Exchange 未能傳遞到匹配的 queue 中
,則會通過ReturnCallback
根據使用者的抉擇,判斷是否需要返回給訊息生產者。
package cn.linkpower.service; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Slf4j @Component public class RabbitmqService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String exchange,String routingKey,Object msg) { // 設定交換機處理失敗訊息的模式 true 表示訊息由交換機 到達不了佇列時,會將訊息重新返回給生產者 // 如果不設定這個指令,則交換機向佇列推播訊息失敗後,不會觸發 setReturnCallback rabbitTemplate.setMandatory(true); //訊息消費者確認收到訊息後,手動ack回執 rabbitTemplate.setConfirmCallback(this); // return 設定 rabbitTemplate.setReturnCallback(this); //傳送訊息 rabbitTemplate.convertAndSend(exchange,routingKey,msg); } /** * 交換機並未將資料丟入指定的佇列中時,觸發 * channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes()); * 引數三:true 表示如果訊息無法正常投遞,則return給生產者 ;false 表示直接丟棄 * @param message 訊息物件 * @param replyCode 錯誤碼 * @param replyText 錯誤資訊 * @param exchange 交換機 * @param routingKey 路由鍵 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" "); } /** * 訊息生產者傳送訊息至交換機時觸發,用於判斷交換機是否成功收到訊息 * @param correlationData 相關設定資訊 * @param ack exchange 交換機,判斷交換機是否成功收到訊息 true 表示交換機收到 * @param cause 失敗原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("---- confirm ----ack="+ack+" cause="+String.valueOf(cause)); log.info("correlationData -->"+correlationData.toString()); if(ack){ // 交換機接收到 log.info("---- confirm ----ack==true cause="+cause); }else{ // 沒有接收到 log.info("---- confirm ----ack==false cause="+cause); } } }
【注意:】設定 setReturnCallback 後,如果需要保證訊息未傳遞到指定的 queue,需要將訊息返回生產者時,一定要增加下面設定:
// 設定交換機處理失敗訊息的模式 true 表示訊息由交換機 到達不了佇列時,會將訊息重新返回給生產者 // 如果不設定這個指令,則交換機向佇列推播訊息失敗後,不會觸發 setReturnCallback rabbitTemplate.setMandatory(true);
修改對應的測試類,保證交換機正確,但路由key不存在對應的佇列即可。
// xiangjiao.routingKey 存在對應的queue // xiangjiao.routingKey_error 不存在對應的 queue rabbitmqService.sendMessage(MQConfiguration.EXCHANGE, "xiangjiao.routingKey_error", msg);
重啟專案,存取介面,進行測試:
訊息傳送給
Exchange
成功,但是通過Exchange
向Queue
中推播資料時 失敗,經過ReturnCallback 的 returnedMessage
捕獲監聽!
通過設定ConfirmCallback
和ReturnCallback
,便能實現訊息生產者到交換機
和訊息由exchange到queue
這個鏈路的安全性!
都是出現問題,或者正常後,給
生產者方
進行反饋。
到此這篇關於Springboot整合Rabbitmq之Confirm和Return詳解的文章就介紹到這了,更多相關Springboot整合Rabbitmq內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45