首頁 > 軟體

SpringBoot整合RocketMQ傳送事務訊息的原理解析

2022-06-30 18:01:10

簡介

RocketMQ 事務訊息(Transactional Message)是指應用本地事務和傳送訊息操作可以被定義到全域性事務中,要麼同時成功,要麼同時失敗。RocketMQ 的事務訊息提供類似 X/Open XA 的分佈事務功能,通過事務訊息能達到分散式事務的最終一致。

原理

RocketMQ事務訊息通過非同步確保方式,保證事務的最終一致性。設計的思想可以借鑑兩個階段提交事務。其執行流程圖如下:

  • 傳送方向MQ伺服器端傳送訊息。
  • MQ Server將訊息持久化成功之後,向傳送方 ACK 確認訊息已經傳送成功,此時訊息為半訊息。
  • 傳送方開始執行本地事務邏輯。
  • 傳送方根據本地事務執行結果向 MQ Server 提交二次確認(Commit 或是 Rollback),MQ Server 收到 Commit 狀態則將半訊息標記為可投遞,訂閱方最終將收到該訊息;MQ Server 收到 Rollback 狀態則刪除半訊息,訂閱方將不會接受該訊息。
  • 在斷網或者是應用重啟的特殊情況下,上述步驟4提交的二次確認最終未到達 MQ Server,經過固定時間後 MQ Server 將對該訊息發起訊息回查。
  • 傳送方收到訊息回查後,需要檢查對應訊息的本地事務執行的最終結果。
  • 傳送方根據檢查得到的本地事務的最終狀態再次提交二次確認,MQ Server 仍按照步驟4對半訊息進行操作。

具體實現

消費者

@Component
public class TransactionProduce
{
    private Logger logger = LoggerFactory.getLogger(getClass());
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void sendTransactionMessage(String msg)
    {
        logger.info("start sendTransMessage hashKey:{}",msg);
       
         Message message =new Message();
         message.setBody("this is tx message".getBytes());
         TransactionSendResult result=rocketMQTemplate.sendMessageInTransaction("test-tx-rocketmq", 
                 MessageBuilder.withPayload(message).build(), msg);
         
         //傳送狀態
         String sendStatus = result.getSendStatus().name();
         // 本地事務執行狀態
         String localTxState = result.getLocalTransactionState().name();
         logger.info("send tx message sendStatus:{},localTXState:{}",sendStatus,localTxState);
    } 
}

說明:傳送事務訊息採用的是sendMessageInTransaction方法,返回結果為TransactionSendResult物件,該物件中包含了事務傳送的狀態、本地事務執行的狀態等。

消費者

@Component
@RocketMQMessageListener(consumerGroup="test-txRocketmq-group",topic="test-tx-rocketmq", messageModel = MessageModel.CLUSTERING)
public class TransactionConsumer implements RocketMQListener<String>
{
    private Logger logger =LoggerFactory.getLogger(getClass());
    @Override
    public void onMessage(String message)
    {
        logger.info("send transaction mssage parma is:{}", message);
    }
}

說明:傳送事務訊息的消費者與普通的消費者一樣沒有太大的區別。

生產者訊息監聽器

傳送事務訊息除了生產者和消費者以外,我們還需要建立生產者的訊息監聽器,來監聽本地事務執行的狀態和檢查本地事務狀態。

@RocketMQTransactionListener
public class TransactionMsgListener implements RocketMQLocalTransactionListener
{
    private Logger logger = LoggerFactory.getLogger(getClass());
    /**
     * 執行本地事務
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
            Object obj)
    {
        logger.info("start invoke local rocketMQ transaction");
        RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
        
        try
        {
            //處理業務
            String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            logger.info("invoke msg content:{}",jsonStr);
        }
        catch (Exception e)
        {
            logger.error("invoke local mq trans error",e);
            resultState = RocketMQLocalTransactionState.UNKNOWN;
        }
        
        return resultState;
    }

    /**
     * 檢查本地事務的狀態
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg)
    {
        logger.info("start check Local rocketMQ transaction");
        
        RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
        
        try
        {
            String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            logger.info("check trans msg content:{}",jsonStr);
        }
        catch (Exception e)
        {
            resultState  = RocketMQLocalTransactionState.ROLLBACK;
        }
        return resultState;
    }
}

說明:RocketMQ本地事務狀態由如下幾種:

  • RocketMQLocalTransactionState.COMMIT:提交事務,允許消費者消費此訊息。
  • RocketMQLocalTransactionState.ROLLBACK: 回滾事務,訊息將被刪除,不允許被消費。
  • RocketMQLocalTransactionState.UNKNOWN:中間狀態,代表需要進行檢查來確定狀態。

注意:Spring Boot2.0的版本之後,@RocketMQTransactionListener 已經沒有了txProducerGroup屬性,且sendMessageInTransaction方法也將其移除。所以在同一專案中只能有一個@RocketMQTransactionListener,不能出現多個,否則會報如下錯誤:

java.lang.IllegalStateException: rocketMQTemplate already exists RocketMQLocalTransactionListener

訊息事務測試

正常測試

c.s.fw.mq.produce.TransactionProduce - product start sendTransMessage msg:{"userId":"zhangsann"}
c.s.f.m.p.TransactionMsgListener - start invoke local rocketMQ transaction
c.s.f.m.p.TransactionMsgListener - invoke local transaction msg content:{"topic":null,"flag":0,"properties":null,"body":"dGhpcyBpcyB0eCBtZXNzYWdl","transactionId":null,"keys":null,"tags":null,"delayTimeLevel":0,"waitStoreMsgOK":true,"buyerId":null}
c.s.fw.mq.produce.TransactionProduce - send tx message sendStatus:SEND_OK,localTXState:COMMIT_MESSAGE
c.s.f.m.consumer.TransactionConsumer - send transaction mssage parma is:{"topic":null,"flag":0,"properties":null,"body":"dGhpcyBpcyB0eCBtZXNzYWdl","transactionId":null,"keys":null,"tags":null,"delayTimeLevel":0,"waitStoreMsgOK":true,"buyerId":null}

說明:通過紀錄檔我們可以看出,執行的流程與上述的一致,執行成功後,訊息執行成功返回的結果為SEND_OK,本地事務執行的狀態為COMMIT_MESSAGE。

異常測試

如果在執行本地訊息時出現異常,那麼執行結果會是怎樣?修改下本地事務執行的方法,讓其出現異常。

程式碼調整

  @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
            Object obj)
    {
        logger.info("start invoke local rocketMQ transaction");
        RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
        
        try
        {
            //處理業務
            String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            logger.info("invoke local transaction msg content:{}",jsonStr);
             int c=1/0;
        }
        catch (Exception e)
        {
            logger.error("invoke local mq trans error",e);
            resultState = RocketMQLocalTransactionState.UNKNOWN;
        }
        
        return resultState;
    }

執行結果

c.s.fw.mq.produce.TransactionProduce - send tx message sendStatus:SEND_OK,localTXState:UNKNOW

從執行的結果可以看出,訊息執行成功返回的結果為SEND_OK,本地事務執行的狀態為:UNKNOW.所以消費端無法消費此訊息。

總結

到此這篇關於SpringBoot整合RocketMQ傳送事務訊息的文章就介紹到這了,更多相關SpringBoot整合RocketMQ事務訊息內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com