首頁 > 軟體

SpringBoot整合rockerMQ訊息佇列詳解

2022-07-26 14:01:19

Springboot整合RockerMQ

1、maven依賴

<dependencies>
    <!-- springboot-web元件 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.0.3</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

2、yml組態檔

rocketmq:
  ###連線地址nameServer
  name-server: www.kaicostudy.com:9876;
  producer:
    group: kaico_producer
server:
  port: 8088

3、生產者

@RequestMapping("/sendMsg")
    public String sendMsg() {
        OrderEntity orderEntity = new OrderEntity("123456","騰訊視訊會員");
        SendResult kaicoTopic = rocketMQTemplate.syncSend("kaicoTopic"+":"+"tag1", orderEntity);
        System.out.println("返回傳送訊息狀態:" + kaicoTopic);
        return "success";
    }

4、消費者

@Service
@RocketMQMessageListener(topic = "kaicoTopic", selectorExpression ="tag1", consumerGroup = "kaico_consumer", messageModel = MessageModel.CLUSTERING)
public class OrdeConsumer2 implements RocketMQListener<OrderEntity> {
    @Override
    public void onMessage(OrderEntity o) {
        System.out.println("kaico_consumer2消費者接收物件:" + o.toString());
    }
}

使用總結

消費模式

叢集消費
當 consumer 使用叢集消費時,每條訊息只會被 consumer 叢集內的任意一個 consumer 範例消費一次。
同時記住一點,使用叢集消費的時候,consumer 的消費進度是儲存在 broker 上,consumer 自身是不儲存消費進度的。訊息進度儲存在 broker 上的好處在於,當你 consumer 叢集是擴大或者縮小時,由於消費進度統一在broker上,訊息重複的概率會被大大降低了。
注意: 在叢集消費模式下,並不能保證每一次訊息失敗重投都投遞到同一個 consumer 範例。

註解設定:messageModel = MessageModel.CLUSTERING

廣播消費
當 consumer 使用廣播消費時,每條訊息都會被 consumer 叢集內所有的 consumer 範例消費一次,也就是說每條訊息至少被每一個 consumer 範例消費一次。
與叢集消費不同的是,consumer 的消費進度是儲存在各個 consumer 範例上,這就容易造成訊息重複。還有很重要的一點,對於廣播消費來說,是不會進行消費失敗重投的,所以在 consumer 端消費邏輯處理時,需要額外關注消費失敗的情況。
雖然廣播消費能保證叢集內每個 consumer 範例都能消費訊息,但是消費進度的維護、不具備訊息重投的機制大大影響了實際的使用。因此,在實際使用中,更推薦使用叢集消費,因為叢集消費不僅擁有消費進度儲存的可靠性,還具有訊息重投的機制。而且,我們通過叢集消費也可以達到廣播消費的效果。

註解設定:messageModel = MessageModel.BROADCASTING

生產者組和消費者組

生產者組
一個生產者組,代表著一群topic相同的Producer。即一個生產者組是同一類Producer的組合。

如果Producer是TransactionMQProducer,則傳送的是事務訊息。如果節點1傳送完訊息後,訊息儲存到broker的Half Message Queue中,還未儲存到目標topic的queue中時,此時節點1崩潰,則可以通過同一Group下的節點2進行二階段提交,或回溯。

使用時,一個節點下,一個topic會對應一個producer

消費者組
一個消費者組,代表著一群topic相同,tag相同(即邏輯相同)的Consumer。通過一個消費者組,則可容易的進行負載均衡以及容錯

使用時,一個節點下,一個topic加一個tag可以對應一個consumer。一個消費者組就是橫向上多個節點的相同consumer為一個消費組。

首先分析一下producer。習慣上我們不會建立多個訂閱了相同topic的Producer範例,因為一個Producer範例傳送訊息時是通過ExecutorService執行緒池去非同步執行的,不會阻塞完全夠用,如果建立了多個相同topic的Producer則會影響效能。而Consumer則不同。訊息會在一topic下會細分多個tag,需要針對tag需要針對不同的tag建立多個消費者範例。

注意:多個不同的消費者組訂閱同一個topic、tag,如果設定的是叢集消費模式,每一個消費者組中都會有一個消費者來消費。也就是說不同的消費者組訂閱同一個topic相互之間是沒有影響的。

生產者投遞訊息的三種方式

同步: 傳送訊息後需等待結果,訊息的可靠性高傳送速度慢;

 SendResult kaicoTopic = rocketMQTemplate.syncSend("kaicoTopic"+":"+"tag1", orderEntity);

非同步: 訊息傳送後,回撥通知結果,訊息傳送速度快,訊息可靠性低;

 //非同步傳送
        rocketMQTemplate.asyncSend("kaicoTopic" + ":" + "tag1", orderEntity, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("非同步傳送訊息成功");
            }
            @Override
            public void onException(Throwable throwable) {
                System.out.println("非同步傳送訊息失敗");
            }
        });

單向(oneway):訊息傳送後,不關心結果,傳送速度最快,訊息可靠性最差,適用於在大量紀錄檔資料和使用者行為資料等場景傳送資料。

//單向(oneway)傳送
        rocketMQTemplate.sendOneWay("kaicoTopic"+":"+"tag1", orderEntity);

如何保證訊息不丟失

主要三個步驟
1、生產者保證訊息傳送成功
採用同步傳送訊息的方式,傳送訊息後有返回結果,保證訊息傳送成功。(程式碼見上面)
返回四個狀態

  • SEND_OK:訊息傳送成功。需要注意的是,訊息傳送到 broker 後,還有兩個操作:訊息刷盤和訊息同步到 slave 節點,預設這兩個操作都是非同步的,只有把這兩個操作都改為同步,SEND_OK 這個狀態才能真正表示傳送成功。
  • FLUSH_DISK_TIMEOUT:訊息傳送成功但是訊息刷盤超時。
  • FLUSH_SLAVE_TIMEOUT:訊息傳送成功但是訊息同步到 slave 節點時超時。
  • SLAVE_NOT_AVAILABLE:訊息傳送成功但是 broker 的 slave 節點不可用。

2、rocketMQ將訊息持久化,保證宕機後訊息不會丟失。持久化策略(刷盤策略)

  • 非同步刷盤:預設。訊息寫入 CommitLog 時,並不會直接寫入磁碟,而是先寫入 PageCache 快取後返回成功,然後用後臺執行緒非同步把訊息刷入磁碟。非同步刷盤提高了訊息吞吐量,但是可能會有訊息丟失的情況,比如斷點導致機器停機,PageCache 中沒來得及刷盤的訊息就會丟失。
  • 同步刷盤:訊息寫入記憶體後,立刻請求刷盤執行緒進行刷盤,如果訊息未在約定的時間內(預設 5 s)刷盤成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到這個響應後,可以進行重試。同步刷盤策略保證了訊息的可靠性,同時降低了吞吐量,增加了延遲。要開啟同步刷盤,需要增加下面設定:

flushDiskType=SYNC_FLUSH

3、Broker 多副本和高可用
Broker 為了保證高可用,採用一主多從的方式部署。
訊息傳送到 master 節點後,slave 節點會從 master 拉取訊息保持跟 master 的一致。這個過程預設是非同步的,即 master 收到訊息後,不等 slave 節點複製訊息就直接給 Producer 返回成功。

這樣會有一個問題,如果 slave 節點還沒有完成訊息複製,這時 master 宕機了,進行主備切換後就會有訊息丟失。為了避免這個問題,可以採用 slave 節點同步複製訊息,即等 slave 節點複製訊息成功後再給 Producer 返回傳送成功。只需要增加下面的設定:
brokerRole=SYNC_MASTER

改為同步複製後,訊息複製流程如下:

  • slave 初始化後,跟 master 建立連線並向 master 傳送自己的 offset;
  • master 收到 slave 傳送的 offset 後,將 offset 後面的訊息批次傳送給 slave;
  • slave 把收到的訊息寫入 commitLog 檔案,並給 master 傳送新的 offset;
  • master 收到新的 offset 後,如果 offset >= producer 傳送訊息後的 offset,給 Producer 返回 SEND_OK。

4、消費者保證訊息消費成功
消費者消費訊息後,如果 Consumer 消費成功,返回 CONSUME_SUCCESS,提交 offset 並從 Broker 拉取下一批訊息。

@Service
public class NoSpringBootOrderConsumer {
    private DefaultMQPushConsumer defaultMQPushConsumer;
    @Value("${rocketmq.name-server}")
    private String namesrvAddr;
    protected String consumerGroup;
    protected String topic;
    protected String topicTag;
    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }
    public void setConsumerGroup(String consumerGroup) {
        this.consumerGroup = consumerGroup;
    }
    public void setTopic(String topic) {
        this.topic = topic;
    }
    public void setTopicTag(String topicTag) {
        this.topicTag = topicTag;
    }
    public static String encoding = System.getProperty("file.encoding");
    /*
     * @Author ex_fengkai
     * @Description //TODO 初始化資料(消費者組名稱、topic、topic的tag、nameServer的資訊)
     * @Date 2020/11/9 14:36
     * @Param []
     * @return void
     **/
    private void initParam() {
        this.consumerGroup = "kaico_consumer3";
        this.topic = "kaicoTopic";
        this.topicTag = "tag1";
        this.setNamesrvAddr(namesrvAddr);
    }
    @PostConstruct
    private void init() throws InterruptedException, MQClientException {
        initParam();
        // ConsumerGroupName需要由應用來保證唯一,用於把多個Consumer組織到一起,提高並行處理能力
        defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup);
        defaultMQPushConsumer.setNamesrvAddr(namesrvAddr); //設定nameServer伺服器
        defaultMQPushConsumer.setInstanceName(String.valueOf(System.currentTimeMillis()));
        defaultMQPushConsumer.setVipChannelEnabled(false);
        // 設定Consumer第一次啟動是從佇列頭部開始消費
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 訂閱指定Topic下的topicTag
        System.out.println("consumerGroup:" + consumerGroup + " topic:" + topic + " ,topicTag:" + topicTag);
        defaultMQPushConsumer.subscribe(topic, topicTag);
        // 設定為叢集消費
        defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
        // 通過匿名訊息監聽處理訊息消費
        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            // 預設msgs裡只有一條訊息,可以通過設定consumeMessageBatchMaxSize引數來批次接收訊息
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt msg = msgs.get(0);
                if (msg.getTopic().equals(topic) && msg.getTags() != null && msg.getTags().equals(topicTag)) {
                    // 執行topic下對應tag的消費邏輯
                    try {
                        onMessage(new String(msg.getBody(),"utf-8"));

                    } catch (UnsupportedEncodingException e) {
                        System.out.println("系統不支援訊息編碼格式:" + encoding);
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;

                    } catch (Exception e) {
                        System.out.println("訊息處理異常");
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    System.out.println("consumerGroup:" + consumerGroup + " MsgId:" + msg.getMsgId() + " was done!");
                }
                // 如果沒有return success ,consumer會重新消費該訊息,直到return success
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // Consumer物件在使用之前必須要呼叫start初始化,初始化一次即可
        defaultMQPushConsumer.start();
        System.out.println("consumerGroup:" + consumerGroup + " namesrvAddr:" + namesrvAddr + "  start success!");
    }
    @PreDestroy
    public void destroy() {
        defaultMQPushConsumer.shutdown();
    }
    private void onMessage(String s) {
        System.out.println(consumerGroup + "用spring的方式的消費者消費:" + s);
    }
}

Consumer 重試
Consumer 消費失敗,這裡有 3 種情況:

  • 返回 RECONSUME_LATER
  • 返回 null
  • 丟擲異常

Broker 收到這個響應後,會把這條訊息放入重試佇列,重新傳送給 Consumer。

注意:Broker 預設最多重試 16 次,如果重試 16 次都失敗,就把這條訊息放入死信佇列,Consumer 可以訂閱死信佇列進行消費。重試只有在叢集模式(MessageModel.CLUSTERING)下生效,在廣播模式(MessageModel.BROADCASTING)下是不生效的。Consumer 端一定要做好冪等處理。

順序訊息

  • 生產者投遞訊息根據key投遞到同一個佇列中存放
  • 消費者應該訂閱到同一個佇列實現消費
  • 最終應該使用同一個執行緒去消費訊息(不能夠實現多執行緒消費。)

生產者程式碼

//傳送順序訊息
    @RequestMapping("/sendMsg1")
    public String sendMsg1() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        Long orderId = System.currentTimeMillis();
        String insertSql = getSqlMsg("insert", orderId);
        String updateSql = getSqlMsg("update", orderId);
        String deleteSql = getSqlMsg("delete", orderId);
        Message insertMsg = new Message("kaicoTopic", "tag6", insertSql.getBytes());
        Message updateMsg = new Message("kaicoTopic", "tag6", updateSql.getBytes());
        Message deleteMsg = new Message("kaicoTopic", "tag6", deleteSql.getBytes());
        DefaultMQProducer producer = rocketMQTemplate.getProducer();
        rocketMQTemplate.getProducer().send(insertMsg
                , new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg,
                                               Object arg) {
                        // 該訊息存放到佇列0中
                        return  mqs.get(0);
                    }
                }, orderId);
        rocketMQTemplate.getProducer().send(updateMsg
                , new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg,
                                               Object arg) {
                        // 該訊息存放到佇列0中
                        return mqs.get(0);
                    }
                }, orderId);
        rocketMQTemplate.getProducer().send(deleteMsg
                , new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg,
                                               Object arg) {
                        // 該訊息存放到佇列0中
                        return  mqs.get(0);
                    }
                }, orderId);
        return orderId + "";
    }

消費者程式碼

@Service
@RocketMQMessageListener(topic = "kaicoTopic", selectorExpression ="tag6", consumerGroup = "kaico_consumer1",
        messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.ORDERLY, consumeThreadMax = 1)
public class OrdeConsumer implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt msg) {
        System.out.println(Thread.currentThread().getName() + "-kaico_consumer1消費者接收物件:佇列" + msg.getQueueId()
                + "=訊息:" +  new String(msg.getBody()));
    }
}

分散式事務

實現思路

  • 生產者(傳送方)投遞事務訊息到Broker中,設定該訊息為半訊息 不可以被消費;
  • 開始執行我們的本地事務,將本地事務執行的結果(回滾或者提交)傳送給Broker
  • Broker獲取回滾或者提交,如果是回滾的情況則刪除該訊息、如果是提交的話,該訊息就可以被消費者消費;
  • Broker如果沒有及時的獲取傳送方本地事務結果的話,會主動查詢本地事務結果。

1、生產者傳送事務訊息sendMessageInTransaction

 public String saveOrder() {
     // 提前生成我們的訂單id
     String orderId = System.currentTimeMillis() + "";
     /**
      * 1.提前生成我們的半訊息
      * 2.半訊息傳送成功之後,在執行我們的本地事務
      */
     OrderEntity orderEntity = createOrder(orderId);
     String msg = JSONObject.toJSONString(orderEntity);
     MessageBuilder<String> stringMessageBuilder = MessageBuilder.withPayload(msg);
     stringMessageBuilder.setHeader("msg", msg);
     Message message = stringMessageBuilder.build();
     // 該訊息不允許被消費者消費,生產者的事務邏輯程式碼在生產者事務監聽類中executeLocalTransaction方法中執行。
     rocketMQTemplate.sendMessageInTransaction("kaicoProducer",
             "orderTopic", message, null);
     return orderId;

}

2、事務監聽類

@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "kaicoProducer") //這個mayiktProducer生產者的事務管理
public class SyncProducerListener implements RocketMQLocalTransactionListener {
    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private TransationalUtils transationalUtils;
    /**
     * 執行我們訂單的事務
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        MessageHeaders headers = msg.getHeaders();
        //拿到訊息
        Object object = headers.get("msg");
        if (object == null) {
            return null;
        }
        String orderMsg = (String) object;
        OrderEntity orderEntity = JSONObject.parseObject(orderMsg, OrderEntity.class);
        TransactionStatus begin = null;
        try {
            begin = transationalUtils.begin();
            int result = orderMapper.addOrder(orderEntity);
            transationalUtils.commit(begin);
            if (result <= 0) {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
            // 告訴我們的Broke可以消費者該訊息
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            if (begin != null) {
                transationalUtils.rollback(begin);
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
        //add.Order
        return null;
    }
    /**
     * 提供給我們的Broker定時檢查
     * @param msg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        MessageHeaders headers = msg.getHeaders();
        Object object = headers.get("msg");
        if (object == null) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        String orderMsg = (String) object;
        OrderEntity orderEntity = JSONObject.parseObject(orderMsg, OrderEntity.class);
        String orderId = orderEntity.getOrderId();
        // 直接查詢我們的資料庫
        OrderEntity orderDbEntity = orderMapper.findOrderId(orderId);
        if (orderDbEntity == null) {
            //不確認,繼續重試
            return RocketMQLocalTransactionState.UNKNOWN;
        }
        //提交事務
        return RocketMQLocalTransactionState.COMMIT;
    }
}

3、消費者消費訊息

@Service
@RocketMQMessageListener(topic = "orderTopic", consumerGroup = "kaicoTopic")
public class OrdeConsumer implements RocketMQListener<String> {
    @Autowired
    private DispatchMapper dispatchMapper;
    @Override
    public void onMessage(String msg) {
        OrderEntity orderEntity = JSONObject.parseObject(msg, OrderEntity.class);
        String orderId = orderEntity.getOrderId();
        // 模擬userid為=123456
        DispatchEntity dispatchEntity = new DispatchEntity(orderId, 123456L);
        dispatchMapper.insertDistribute(dispatchEntity);
    }
}

到此這篇關於SpringBoot整合rockerMQ訊息佇列詳解的文章就介紹到這了,更多相關SpringBoot整合rockerMQ內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com