首頁 > 軟體

RabbitMq訊息防丟失功能實現方式講解

2023-01-22 14:01:31

1.概述

1.1.資料丟失的原因

在訊息中有三種可能性造成資料丟失:

  • 消費者消費訊息失敗
  • 生產者生產訊息失敗
  • MQ資料丟失

消費者消費訊息失敗:

RabbitMq存在應答機制,預設為自動應答,MQ向消費者推播一條訊息,消費者收到這條訊息後會返回一個ack(應答)給MQ,MQ收到應答後會刪除這條訊息。

自動應答存在一個問題,就是消費者收到訊息後立馬就會給MQ返回ack,如果消費者返回完ack但還沒來的及真正處理這條訊息時,消費者斷電宕機了,那麼這條訊息就丟失了。

這就是由於消費者消費訊息失敗造成的資料丟失。

生產者生產資料失敗:

生產者向MQ推播了一條訊息,但是由於由於諸如網路故障等原因mq並沒有收到該條訊息,這樣就造成了這條訊息的丟失。

MQ資料丟失:

MQ的資料是存在記憶體中的,諸如斷電等原因可能會造成資料的丟失。

1.2.如何防止資料丟失

解決以上列舉的資料丟失問題的辦法有三種:

  • 手動應答
  • 訊息確認機制
  • 持久化

手動應答:

RabbitMQ預設是自動應答,消費者收到訊息後就會自動返回ack給MQ,可以將應答模式改為手動應答,在消費者一側訊息的消費動作完成後手動來返回ack給MQ,用來解決“消費者消費訊息失敗”問題。

訊息確認機制:

當訊息佇列收到訊息後,告知生產者,讓生產者感知到自己生產的訊息,訊息佇列已經接收到,用來解決“生產者生產訊息失敗”問題。訊息確認機制有兩種實現方式:

  • AMQP事務
  • confirm

持久化:

訊息佇列的訊息持久化到磁碟上,用來解決“MQ資料丟失”問題。

2.手動應答

手動應答是通過設定channel來實現的,以下為一個完整程式碼範例。

設定類:

@Configuration
public class config {
    @Bean
    public Queue queue(){
        return new Queue("queue_01",false);
    }
}

生產者:

@SpringBootTest(classes = Main.class)
public class Producer {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    public void producerMsg(){
        rabbitTemplate.convertAndSend("queue_01","hello_world");
    }
}

消費者:

@Component
@Slf4j
public class Consumer {
    @RabbitListener(queues = {"queue_01"})
    public void consumerMsg(String msg, Message message,Channel channel){
        try {
            log.info("消費者消費訊息: "+msg);
            /**
             * 沒有異常就確認訊息
             * basicAck(long deliveryTag, boolean multiple)
             * deliveryTag:當前訊息在佇列中的的索引;
             * multiple:為true的話就是批次確認
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            /**
             * 有異常就拒收訊息
             * basicNack(long deliveryTag, boolean multiple, boolean requeue)
             * requeue:true為將訊息重返當前訊息佇列,重新傳送給消費者;
             *         false將訊息丟棄
             */
            try {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            } catch (Exception ex) {
                log.error(ex.getMessage());
            }
        }
    }
}

3.訊息確認機制

AQMP事務、confirm其實都是基於channel的。

3.1.AMQP事務

AMQP事務和資料庫事務類似,定義一組對MQ的操作,統一提交,成功則全部一起執行,失敗則全部回滾。AMQP事務在spring boot中的使用很簡單,和資料庫的事務一樣,一個註解就可以搞定。

@GetMapping("/direct/wx/transactional")
@Transactional(rollbackFor = Exception.class)
public String sendDirectMessageTransactional() {
  rabbitTemplate.convertAndSend("direct_exchange", "wx","hello world!");
  log.info("開啟事務訊息機制");
    try {
           Thread.sleep(5000);
       } catch (Exception e) {
            e.printStackTrace();
       }
      return "ok";
}

3.2.confirm

confirm是基於channel的,一旦channel進入confirm模式,所有在該channel上釋出的訊息都會被指派一個唯一的ID(從1開始),訊息被投遞道匹配佇列後broker會傳送一個確認訊息給生產者。如果訊息和佇列是可持久化的(durable為true),那麼確認訊息會在訊息被寫入磁碟後發出。

confirm最大的好處在於非同步,生產者在等待上一條訊息的確認訊息的時候可以繼續往下傳送。

confirm在spring boot中的使用很簡單,在組態檔中開啟即可,並且支援自定義回撥函數:

組態檔:

spring.rabbitmq.publisher-confirms: true

spring.rabbitmq.publisher-returns: true

生產者:

@Slf4j
@Component
public class RabbitmqService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendMessage(String exchange,String routingKey,Object msg) {
        // 設定交換機處理失敗訊息的模式     true 表示訊息由交換機 到達不了佇列時,會將訊息重新返回給生產者
        // 如果不設定這個指令,則交換機向佇列推播訊息失敗後,不會觸發 setReturnCallback
        rabbitTemplate.setMandatory(true);
        //訊息消費者確認收到訊息後,手動ack回執
        rabbitTemplate.setConfirmCallback(this);
        // 暫時關閉 return 設定
        //rabbitTemplate.setReturnCallback(this);
        //傳送訊息
        rabbitTemplate.convertAndSend(exchange,routingKey,msg);
    }
    /**
     * 交換機並未將資料丟入指定的佇列中時,觸發
     *  channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes());
     *  引數三:true  表示如果訊息無法正常投遞,則return給生產者 ;false 表示直接丟棄
     * @param message   訊息物件
     * @param replyCode 錯誤碼
     * @param replyText 錯誤資訊
     * @param exchange 交換機
     * @param routingKey 路由鍵
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" ");
    }
    /**
     * 訊息生產者傳送訊息至交換機時觸發,用於判斷交換機是否成功收到訊息
     * @param correlationData  相關設定資訊
     * @param ack exchange 交換機,判斷交換機是否成功收到訊息    true 表示交換機收到
     * @param cause  失敗原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("---- confirm ----ack="+ack+"  cause="+String.valueOf(cause));
        log.info("correlationData -->"+correlationData.toString());
        if(ack){
            // 交換機接收到
            log.info("---- confirm ----ack==true  cause="+cause);
        }else{
            // 沒有接收到
            log.info("---- confirm ----ack==false  cause="+cause);
        }
    }
}

到此這篇關於RabbitMq訊息防丟失功能實現方式講解的文章就介紹到這了,更多相關RabbitMq訊息防丟失內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com