<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
1、maven依賴
<dependencies> <!-- springboot-web元件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
2、yml組態檔
rocketmq:
###連線地址nameServer
name-server: www.kaicostudy.com:9876;
producer:
group: kaico_producer
server:
port: 8088
3、生產者
@RequestMapping("/sendMsg") public String sendMsg() { OrderEntity orderEntity = new OrderEntity("123456","騰訊視訊會員"); SendResult kaicoTopic = rocketMQTemplate.syncSend("kaicoTopic"+":"+"tag1", orderEntity); System.out.println("返回傳送訊息狀態:" + kaicoTopic); return "success"; }
4、消費者
@Service @RocketMQMessageListener(topic = "kaicoTopic", selectorExpression ="tag1", consumerGroup = "kaico_consumer", messageModel = MessageModel.CLUSTERING) public class OrdeConsumer2 implements RocketMQListener<OrderEntity> { @Override public void onMessage(OrderEntity o) { System.out.println("kaico_consumer2消費者接收物件:" + o.toString()); } }
叢集消費
當 consumer 使用叢集消費時,每條訊息只會被 consumer 叢集內的任意一個 consumer 範例消費一次。
同時記住一點,使用叢集消費的時候,consumer 的消費進度是儲存在 broker 上,consumer 自身是不儲存消費進度的。訊息進度儲存在 broker 上的好處在於,當你 consumer 叢集是擴大或者縮小時,由於消費進度統一在broker上,訊息重複的概率會被大大降低了。
注意: 在叢集消費模式下,並不能保證每一次訊息失敗重投都投遞到同一個 consumer 範例。
註解設定:messageModel = MessageModel.CLUSTERING
廣播消費
當 consumer 使用廣播消費時,每條訊息都會被 consumer 叢集內所有的 consumer 範例消費一次,也就是說每條訊息至少被每一個 consumer 範例消費一次。
與叢集消費不同的是,consumer 的消費進度是儲存在各個 consumer 範例上,這就容易造成訊息重複。還有很重要的一點,對於廣播消費來說,是不會進行消費失敗重投的,所以在 consumer 端消費邏輯處理時,需要額外關注消費失敗的情況。
雖然廣播消費能保證叢集內每個 consumer 範例都能消費訊息,但是消費進度的維護、不具備訊息重投的機制大大影響了實際的使用。因此,在實際使用中,更推薦使用叢集消費,因為叢集消費不僅擁有消費進度儲存的可靠性,還具有訊息重投的機制。而且,我們通過叢集消費也可以達到廣播消費的效果。
註解設定:messageModel = MessageModel.BROADCASTING
生產者組
一個生產者組,代表著一群topic相同的Producer。即一個生產者組是同一類Producer的組合。
如果Producer是TransactionMQProducer,則傳送的是事務訊息。如果節點1傳送完訊息後,訊息儲存到broker的Half Message Queue中,還未儲存到目標topic的queue中時,此時節點1崩潰,則可以通過同一Group下的節點2進行二階段提交,或回溯。
使用時,一個節點下,一個topic會對應一個producer
消費者組
一個消費者組,代表著一群topic相同,tag相同(即邏輯相同)的Consumer。通過一個消費者組,則可容易的進行負載均衡以及容錯
使用時,一個節點下,一個topic加一個tag可以對應一個consumer。一個消費者組就是橫向上多個節點的相同consumer為一個消費組。
首先分析一下producer。習慣上我們不會建立多個訂閱了相同topic的Producer範例,因為一個Producer範例傳送訊息時是通過ExecutorService執行緒池去非同步執行的,不會阻塞完全夠用,如果建立了多個相同topic的Producer則會影響效能。而Consumer則不同。訊息會在一topic下會細分多個tag,需要針對tag需要針對不同的tag建立多個消費者範例。
注意:多個不同的消費者組訂閱同一個topic、tag,如果設定的是叢集消費模式,每一個消費者組中都會有一個消費者來消費。也就是說不同的消費者組訂閱同一個topic相互之間是沒有影響的。
同步: 傳送訊息後需等待結果,訊息的可靠性高傳送速度慢;
SendResult kaicoTopic = rocketMQTemplate.syncSend("kaicoTopic"+":"+"tag1", orderEntity);
非同步: 訊息傳送後,回撥通知結果,訊息傳送速度快,訊息可靠性低;
//非同步傳送 rocketMQTemplate.asyncSend("kaicoTopic" + ":" + "tag1", orderEntity, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("非同步傳送訊息成功"); } @Override public void onException(Throwable throwable) { System.out.println("非同步傳送訊息失敗"); } });
單向(oneway):訊息傳送後,不關心結果,傳送速度最快,訊息可靠性最差,適用於在大量紀錄檔資料和使用者行為資料等場景傳送資料。
//單向(oneway)傳送 rocketMQTemplate.sendOneWay("kaicoTopic"+":"+"tag1", orderEntity);
主要三個步驟
1、生產者保證訊息傳送成功
採用同步傳送訊息的方式,傳送訊息後有返回結果,保證訊息傳送成功。(程式碼見上面)
返回四個狀態
2、rocketMQ將訊息持久化,保證宕機後訊息不會丟失。持久化策略(刷盤策略)
flushDiskType=SYNC_FLUSH
3、Broker 多副本和高可用
Broker 為了保證高可用,採用一主多從的方式部署。
訊息傳送到 master 節點後,slave 節點會從 master 拉取訊息保持跟 master 的一致。這個過程預設是非同步的,即 master 收到訊息後,不等 slave 節點複製訊息就直接給 Producer 返回成功。
這樣會有一個問題,如果 slave 節點還沒有完成訊息複製,這時 master 宕機了,進行主備切換後就會有訊息丟失。為了避免這個問題,可以採用 slave 節點同步複製訊息,即等 slave 節點複製訊息成功後再給 Producer 返回傳送成功。只需要增加下面的設定:brokerRole=SYNC_MASTER
改為同步複製後,訊息複製流程如下:
4、消費者保證訊息消費成功
消費者消費訊息後,如果 Consumer 消費成功,返回 CONSUME_SUCCESS,提交 offset 並從 Broker 拉取下一批訊息。
@Service public class NoSpringBootOrderConsumer { private DefaultMQPushConsumer defaultMQPushConsumer; @Value("${rocketmq.name-server}") private String namesrvAddr; protected String consumerGroup; protected String topic; protected String topicTag; public void setNamesrvAddr(String namesrvAddr) { this.namesrvAddr = namesrvAddr; } public void setConsumerGroup(String consumerGroup) { this.consumerGroup = consumerGroup; } public void setTopic(String topic) { this.topic = topic; } public void setTopicTag(String topicTag) { this.topicTag = topicTag; } public static String encoding = System.getProperty("file.encoding"); /* * @Author ex_fengkai * @Description //TODO 初始化資料(消費者組名稱、topic、topic的tag、nameServer的資訊) * @Date 2020/11/9 14:36 * @Param [] * @return void **/ private void initParam() { this.consumerGroup = "kaico_consumer3"; this.topic = "kaicoTopic"; this.topicTag = "tag1"; this.setNamesrvAddr(namesrvAddr); } @PostConstruct private void init() throws InterruptedException, MQClientException { initParam(); // ConsumerGroupName需要由應用來保證唯一,用於把多個Consumer組織到一起,提高並行處理能力 defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup); defaultMQPushConsumer.setNamesrvAddr(namesrvAddr); //設定nameServer伺服器 defaultMQPushConsumer.setInstanceName(String.valueOf(System.currentTimeMillis())); defaultMQPushConsumer.setVipChannelEnabled(false); // 設定Consumer第一次啟動是從佇列頭部開始消費 defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 訂閱指定Topic下的topicTag System.out.println("consumerGroup:" + consumerGroup + " topic:" + topic + " ,topicTag:" + topicTag); defaultMQPushConsumer.subscribe(topic, topicTag); // 設定為叢集消費 defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING); // 通過匿名訊息監聽處理訊息消費 defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() { // 預設msgs裡只有一條訊息,可以通過設定consumeMessageBatchMaxSize引數來批次接收訊息 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt msg = msgs.get(0); if (msg.getTopic().equals(topic) && msg.getTags() != null && msg.getTags().equals(topicTag)) { // 執行topic下對應tag的消費邏輯 try { onMessage(new String(msg.getBody(),"utf-8")); } catch (UnsupportedEncodingException e) { System.out.println("系統不支援訊息編碼格式:" + encoding); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } catch (Exception e) { System.out.println("訊息處理異常"); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } System.out.println("consumerGroup:" + consumerGroup + " MsgId:" + msg.getMsgId() + " was done!"); } // 如果沒有return success ,consumer會重新消費該訊息,直到return success return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // Consumer物件在使用之前必須要呼叫start初始化,初始化一次即可 defaultMQPushConsumer.start(); System.out.println("consumerGroup:" + consumerGroup + " namesrvAddr:" + namesrvAddr + " start success!"); } @PreDestroy public void destroy() { defaultMQPushConsumer.shutdown(); } private void onMessage(String s) { System.out.println(consumerGroup + "用spring的方式的消費者消費:" + s); } }
Consumer 重試
Consumer 消費失敗,這裡有 3 種情況:
Broker 收到這個響應後,會把這條訊息放入重試佇列,重新傳送給 Consumer。
注意:Broker 預設最多重試 16 次,如果重試 16 次都失敗,就把這條訊息放入死信佇列,Consumer 可以訂閱死信佇列進行消費。重試只有在叢集模式(MessageModel.CLUSTERING)下生效,在廣播模式(MessageModel.BROADCASTING)下是不生效的。Consumer 端一定要做好冪等處理。
生產者程式碼
//傳送順序訊息 @RequestMapping("/sendMsg1") public String sendMsg1() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { Long orderId = System.currentTimeMillis(); String insertSql = getSqlMsg("insert", orderId); String updateSql = getSqlMsg("update", orderId); String deleteSql = getSqlMsg("delete", orderId); Message insertMsg = new Message("kaicoTopic", "tag6", insertSql.getBytes()); Message updateMsg = new Message("kaicoTopic", "tag6", updateSql.getBytes()); Message deleteMsg = new Message("kaicoTopic", "tag6", deleteSql.getBytes()); DefaultMQProducer producer = rocketMQTemplate.getProducer(); rocketMQTemplate.getProducer().send(insertMsg , new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // 該訊息存放到佇列0中 return mqs.get(0); } }, orderId); rocketMQTemplate.getProducer().send(updateMsg , new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // 該訊息存放到佇列0中 return mqs.get(0); } }, orderId); rocketMQTemplate.getProducer().send(deleteMsg , new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // 該訊息存放到佇列0中 return mqs.get(0); } }, orderId); return orderId + ""; }
消費者程式碼
@Service @RocketMQMessageListener(topic = "kaicoTopic", selectorExpression ="tag6", consumerGroup = "kaico_consumer1", messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.ORDERLY, consumeThreadMax = 1) public class OrdeConsumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt msg) { System.out.println(Thread.currentThread().getName() + "-kaico_consumer1消費者接收物件:佇列" + msg.getQueueId() + "=訊息:" + new String(msg.getBody())); } }
實現思路
1、生產者傳送事務訊息sendMessageInTransaction
public String saveOrder() { // 提前生成我們的訂單id String orderId = System.currentTimeMillis() + ""; /** * 1.提前生成我們的半訊息 * 2.半訊息傳送成功之後,在執行我們的本地事務 */ OrderEntity orderEntity = createOrder(orderId); String msg = JSONObject.toJSONString(orderEntity); MessageBuilder<String> stringMessageBuilder = MessageBuilder.withPayload(msg); stringMessageBuilder.setHeader("msg", msg); Message message = stringMessageBuilder.build(); // 該訊息不允許被消費者消費,生產者的事務邏輯程式碼在生產者事務監聽類中executeLocalTransaction方法中執行。 rocketMQTemplate.sendMessageInTransaction("kaicoProducer", "orderTopic", message, null); return orderId; }
2、事務監聽類
@Slf4j @Component @RocketMQTransactionListener(txProducerGroup = "kaicoProducer") //這個mayiktProducer生產者的事務管理 public class SyncProducerListener implements RocketMQLocalTransactionListener { @Autowired private OrderMapper orderMapper; @Autowired private TransationalUtils transationalUtils; /** * 執行我們訂單的事務 * @param msg * @param arg * @return */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { MessageHeaders headers = msg.getHeaders(); //拿到訊息 Object object = headers.get("msg"); if (object == null) { return null; } String orderMsg = (String) object; OrderEntity orderEntity = JSONObject.parseObject(orderMsg, OrderEntity.class); TransactionStatus begin = null; try { begin = transationalUtils.begin(); int result = orderMapper.addOrder(orderEntity); transationalUtils.commit(begin); if (result <= 0) { return RocketMQLocalTransactionState.ROLLBACK; } // 告訴我們的Broke可以消費者該訊息 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { if (begin != null) { transationalUtils.rollback(begin); return RocketMQLocalTransactionState.ROLLBACK; } } //add.Order return null; } /** * 提供給我們的Broker定時檢查 * @param msg * @return */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { MessageHeaders headers = msg.getHeaders(); Object object = headers.get("msg"); if (object == null) { return RocketMQLocalTransactionState.ROLLBACK; } String orderMsg = (String) object; OrderEntity orderEntity = JSONObject.parseObject(orderMsg, OrderEntity.class); String orderId = orderEntity.getOrderId(); // 直接查詢我們的資料庫 OrderEntity orderDbEntity = orderMapper.findOrderId(orderId); if (orderDbEntity == null) { //不確認,繼續重試 return RocketMQLocalTransactionState.UNKNOWN; } //提交事務 return RocketMQLocalTransactionState.COMMIT; } }
3、消費者消費訊息
@Service @RocketMQMessageListener(topic = "orderTopic", consumerGroup = "kaicoTopic") public class OrdeConsumer implements RocketMQListener<String> { @Autowired private DispatchMapper dispatchMapper; @Override public void onMessage(String msg) { OrderEntity orderEntity = JSONObject.parseObject(msg, OrderEntity.class); String orderId = orderEntity.getOrderId(); // 模擬userid為=123456 DispatchEntity dispatchEntity = new DispatchEntity(orderId, 123456L); dispatchMapper.insertDistribute(dispatchEntity); } }
到此這篇關於SpringBoot整合rockerMQ訊息佇列詳解的文章就介紹到這了,更多相關SpringBoot整合rockerMQ內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45