<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
RocketMQ 事務訊息(Transactional Message)是指應用本地事務和傳送訊息操作可以被定義到全域性事務中,要麼同時成功,要麼同時失敗。RocketMQ 的事務訊息提供類似 X/Open XA 的分佈事務功能,通過事務訊息能達到分散式事務的最終一致。
RocketMQ事務訊息通過非同步確保方式,保證事務的最終一致性。設計的思想可以借鑑兩個階段提交事務。其執行流程圖如下:
@Component public class TransactionProduce { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private RocketMQTemplate rocketMQTemplate; public void sendTransactionMessage(String msg) { logger.info("start sendTransMessage hashKey:{}",msg); Message message =new Message(); message.setBody("this is tx message".getBytes()); TransactionSendResult result=rocketMQTemplate.sendMessageInTransaction("test-tx-rocketmq", MessageBuilder.withPayload(message).build(), msg); //傳送狀態 String sendStatus = result.getSendStatus().name(); // 本地事務執行狀態 String localTxState = result.getLocalTransactionState().name(); logger.info("send tx message sendStatus:{},localTXState:{}",sendStatus,localTxState); } }
說明:傳送事務訊息採用的是sendMessageInTransaction方法,返回結果為TransactionSendResult物件,該物件中包含了事務傳送的狀態、本地事務執行的狀態等。
@Component @RocketMQMessageListener(consumerGroup="test-txRocketmq-group",topic="test-tx-rocketmq", messageModel = MessageModel.CLUSTERING) public class TransactionConsumer implements RocketMQListener<String> { private Logger logger =LoggerFactory.getLogger(getClass()); @Override public void onMessage(String message) { logger.info("send transaction mssage parma is:{}", message); } }
說明:傳送事務訊息的消費者與普通的消費者一樣沒有太大的區別。
傳送事務訊息除了生產者和消費者以外,我們還需要建立生產者的訊息監聽器,來監聽本地事務執行的狀態和檢查本地事務狀態。
@RocketMQTransactionListener public class TransactionMsgListener implements RocketMQLocalTransactionListener { private Logger logger = LoggerFactory.getLogger(getClass()); /** * 執行本地事務 */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object obj) { logger.info("start invoke local rocketMQ transaction"); RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT; try { //處理業務 String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8); logger.info("invoke msg content:{}",jsonStr); } catch (Exception e) { logger.error("invoke local mq trans error",e); resultState = RocketMQLocalTransactionState.UNKNOWN; } return resultState; } /** * 檢查本地事務的狀態 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { logger.info("start check Local rocketMQ transaction"); RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT; try { String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8); logger.info("check trans msg content:{}",jsonStr); } catch (Exception e) { resultState = RocketMQLocalTransactionState.ROLLBACK; } return resultState; } }
說明:RocketMQ本地事務狀態由如下幾種:
注意:Spring Boot2.0的版本之後,@RocketMQTransactionListener 已經沒有了txProducerGroup屬性,且sendMessageInTransaction方法也將其移除。所以在同一專案中只能有一個@RocketMQTransactionListener,不能出現多個,否則會報如下錯誤:
java.lang.IllegalStateException: rocketMQTemplate already exists RocketMQLocalTransactionListener
c.s.fw.mq.produce.TransactionProduce - product start sendTransMessage msg:{"userId":"zhangsann"} c.s.f.m.p.TransactionMsgListener - start invoke local rocketMQ transaction c.s.f.m.p.TransactionMsgListener - invoke local transaction msg content:{"topic":null,"flag":0,"properties":null,"body":"dGhpcyBpcyB0eCBtZXNzYWdl","transactionId":null,"keys":null,"tags":null,"delayTimeLevel":0,"waitStoreMsgOK":true,"buyerId":null} c.s.fw.mq.produce.TransactionProduce - send tx message sendStatus:SEND_OK,localTXState:COMMIT_MESSAGE c.s.f.m.consumer.TransactionConsumer - send transaction mssage parma is:{"topic":null,"flag":0,"properties":null,"body":"dGhpcyBpcyB0eCBtZXNzYWdl","transactionId":null,"keys":null,"tags":null,"delayTimeLevel":0,"waitStoreMsgOK":true,"buyerId":null}
說明:通過紀錄檔我們可以看出,執行的流程與上述的一致,執行成功後,訊息執行成功返回的結果為SEND_OK,本地事務執行的狀態為COMMIT_MESSAGE。
如果在執行本地訊息時出現異常,那麼執行結果會是怎樣?修改下本地事務執行的方法,讓其出現異常。
@Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object obj) { logger.info("start invoke local rocketMQ transaction"); RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT; try { //處理業務 String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8); logger.info("invoke local transaction msg content:{}",jsonStr); int c=1/0; } catch (Exception e) { logger.error("invoke local mq trans error",e); resultState = RocketMQLocalTransactionState.UNKNOWN; } return resultState; }
c.s.fw.mq.produce.TransactionProduce - send tx message sendStatus:SEND_OK,localTXState:UNKNOW
從執行的結果可以看出,訊息執行成功返回的結果為SEND_OK,本地事務執行的狀態為:UNKNOW.所以消費端無法消費此訊息。
到此這篇關於SpringBoot整合RocketMQ傳送事務訊息的文章就介紹到這了,更多相關SpringBoot整合RocketMQ事務訊息內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45