<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
在訊息中有三種可能性造成資料丟失:
消費者消費訊息失敗:
RabbitMq存在應答機制,預設為自動應答,MQ向消費者推播一條訊息,消費者收到這條訊息後會返回一個ack(應答)給MQ,MQ收到應答後會刪除這條訊息。
自動應答存在一個問題,就是消費者收到訊息後立馬就會給MQ返回ack,如果消費者返回完ack但還沒來的及真正處理這條訊息時,消費者斷電宕機了,那麼這條訊息就丟失了。
這就是由於消費者消費訊息失敗造成的資料丟失。
生產者生產資料失敗:
生產者向MQ推播了一條訊息,但是由於由於諸如網路故障等原因mq並沒有收到該條訊息,這樣就造成了這條訊息的丟失。
MQ資料丟失:
MQ的資料是存在記憶體中的,諸如斷電等原因可能會造成資料的丟失。
解決以上列舉的資料丟失問題的辦法有三種:
手動應答:
RabbitMQ預設是自動應答,消費者收到訊息後就會自動返回ack給MQ,可以將應答模式改為手動應答,在消費者一側訊息的消費動作完成後手動來返回ack給MQ,用來解決“消費者消費訊息失敗”問題。
訊息確認機制:
當訊息佇列收到訊息後,告知生產者,讓生產者感知到自己生產的訊息,訊息佇列已經接收到,用來解決“生產者生產訊息失敗”問題。訊息確認機制有兩種實現方式:
持久化:
訊息佇列的訊息持久化到磁碟上,用來解決“MQ資料丟失”問題。
手動應答是通過設定channel來實現的,以下為一個完整程式碼範例。
設定類:
@Configuration public class config { @Bean public Queue queue(){ return new Queue("queue_01",false); } }
生產者:
@SpringBootTest(classes = Main.class) public class Producer { @Autowired RabbitTemplate rabbitTemplate; @Test public void producerMsg(){ rabbitTemplate.convertAndSend("queue_01","hello_world"); } }
消費者:
@Component @Slf4j public class Consumer { @RabbitListener(queues = {"queue_01"}) public void consumerMsg(String msg, Message message,Channel channel){ try { log.info("消費者消費訊息: "+msg); /** * 沒有異常就確認訊息 * basicAck(long deliveryTag, boolean multiple) * deliveryTag:當前訊息在佇列中的的索引; * multiple:為true的話就是批次確認 */ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { /** * 有異常就拒收訊息 * basicNack(long deliveryTag, boolean multiple, boolean requeue) * requeue:true為將訊息重返當前訊息佇列,重新傳送給消費者; * false將訊息丟棄 */ try { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } catch (Exception ex) { log.error(ex.getMessage()); } } } }
AQMP事務、confirm其實都是基於channel的。
AMQP事務和資料庫事務類似,定義一組對MQ的操作,統一提交,成功則全部一起執行,失敗則全部回滾。AMQP事務在spring boot中的使用很簡單,和資料庫的事務一樣,一個註解就可以搞定。
@GetMapping("/direct/wx/transactional") @Transactional(rollbackFor = Exception.class) public String sendDirectMessageTransactional() { rabbitTemplate.convertAndSend("direct_exchange", "wx","hello world!"); log.info("開啟事務訊息機制"); try { Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } return "ok"; }
confirm是基於channel的,一旦channel進入confirm模式,所有在該channel上釋出的訊息都會被指派一個唯一的ID(從1開始),訊息被投遞道匹配佇列後broker會傳送一個確認訊息給生產者。如果訊息和佇列是可持久化的(durable為true),那麼確認訊息會在訊息被寫入磁碟後發出。
confirm最大的好處在於非同步,生產者在等待上一條訊息的確認訊息的時候可以繼續往下傳送。
confirm在spring boot中的使用很簡單,在組態檔中開啟即可,並且支援自定義回撥函數:
組態檔:
spring.rabbitmq.publisher-confirms: true
spring.rabbitmq.publisher-returns: true
生產者:
@Slf4j @Component public class RabbitmqService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String exchange,String routingKey,Object msg) { // 設定交換機處理失敗訊息的模式 true 表示訊息由交換機 到達不了佇列時,會將訊息重新返回給生產者 // 如果不設定這個指令,則交換機向佇列推播訊息失敗後,不會觸發 setReturnCallback rabbitTemplate.setMandatory(true); //訊息消費者確認收到訊息後,手動ack回執 rabbitTemplate.setConfirmCallback(this); // 暫時關閉 return 設定 //rabbitTemplate.setReturnCallback(this); //傳送訊息 rabbitTemplate.convertAndSend(exchange,routingKey,msg); } /** * 交換機並未將資料丟入指定的佇列中時,觸發 * channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes()); * 引數三:true 表示如果訊息無法正常投遞,則return給生產者 ;false 表示直接丟棄 * @param message 訊息物件 * @param replyCode 錯誤碼 * @param replyText 錯誤資訊 * @param exchange 交換機 * @param routingKey 路由鍵 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" "); } /** * 訊息生產者傳送訊息至交換機時觸發,用於判斷交換機是否成功收到訊息 * @param correlationData 相關設定資訊 * @param ack exchange 交換機,判斷交換機是否成功收到訊息 true 表示交換機收到 * @param cause 失敗原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("---- confirm ----ack="+ack+" cause="+String.valueOf(cause)); log.info("correlationData -->"+correlationData.toString()); if(ack){ // 交換機接收到 log.info("---- confirm ----ack==true cause="+cause); }else{ // 沒有接收到 log.info("---- confirm ----ack==false cause="+cause); } } }
到此這篇關於RabbitMq訊息防丟失功能實現方式講解的文章就介紹到這了,更多相關RabbitMq訊息防丟失內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45