首頁 > 軟體

詳解RocketMQ 消費端如何監聽訊息

2022-12-17 14:00:52

前言

上一篇文章中我們主要來看RocketMQ訊息消費者是如何啟動的,

那他有一個步驟是非常重要的,就是啟動訊息的監聽,通過不斷的拉取訊息,來實現訊息的監聽,那具體怎麼做,讓我們我們跟著原始碼來學習一下~

流程地圖

原始碼跟蹤

這一塊的程式碼比較多,我自己對關鍵點的一些整理,這個圖我畫的不是很OK

核心模組(訊息拉取)

入口:this.pullMessageService.start();

  • 執行執行緒池run方法,輪流從pullRequestQueue中獲取PullRequest

org.apache.rocketmq.client.impl.consumer.PullMessageService#run

宣告一個阻塞佇列用來存放 PullRequest 物件

PullRequest 用於訊息拉取任務,如果 pullRequestQueue 為空則會阻塞,直到拉取任務被放入

private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();

將 stopped 用volatile來修飾,每次執行的時候都檢測stopped的狀態,執行緒只要修改了這個狀態,其餘執行緒就會馬上知道

protected volatile boolean stopped = false;
@Override
public void run() {
    log.info(this.getServiceName() + " service started");
    // 判斷啟動狀態
    while (!this.isStopped()) {
        try {
            // 取出一個PullRequest物件
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }
    log.info(this.getServiceName() + " service end");
}
  • 獲取消費佇列快照,判斷狀態是否正常,同時更新最後一次拉取時間

PullMessageService 從訊息伺服器預設拉取32條訊息,按訊息的偏移量順序存放在 ProcessQueue 佇列

final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());

入口:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

// 獲取消費佇列快照
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
    log.info("the pull request[{}] is dropped.", pullRequest.toString());
    return;
}
// 設定最後一次拉取時間
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
  • 校驗使用者端執行狀態
// 校驗狀態
this.makeSureStateOK();
private void makeSureStateOK() throws MQClientException {
    if (this.serviceState != ServiceState.RUNNING) {
        throw new MQClientException("The consumer service state not OK, "
            + this.serviceState
            + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
            null);
    }
}

如果消費者狀態不正確,則丟擲異常,啟動定時執行緒池過段時間回收 PullRequest 物件,以便pullMessageService能及時喚醒並再次執行訊息拉取,這個邏輯在多個地方使用到了

public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
    if (!isStopped()) {
        this.scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                PullMessageService.this.executePullRequestImmediately(pullRequest);
            }
        }, timeDelay, TimeUnit.MILLISECONDS);
    } else {
        log.warn("PullMessageServiceScheduledThread has shutdown");
    }
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
    try {
        // 最後將pullRequest放入pullRequestQueue中
        this.pullRequestQueue.put(pullRequest);
    } catch (InterruptedException e) {
        log.error("executePullRequestImmediately pullRequestQueue.put", e);
    }
}
  • 校驗消費佇列中的訊息數量和大小是否符合設定

如果觸發流量控制,則延遲拉取訊息,先將 PullRequest 物件進行回收,以便pullMessageService能及時喚醒並再次執行訊息拉取

// 快取訊息條數
long cachedMessageCount = processQueue.getMsgCount().get();
// 快取訊息的大小
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
// 當佇列中的訊息跳過,超過設定 則延遲拉取訊息
if (cachedMessageCount &gt; this.defaultMQPushConsumer.getPullThresholdForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
        log.warn(
            "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
            this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
}
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
        log.warn(
            "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
            this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
}
  • 根據主題獲取設定的訂閱關係

這裡通過查詢 subscriptionInner Map容器,利用主題來獲取對應的訂閱關係,如果沒有找到對應的訂閱關係,則延遲拉取訊息,先將 PullRequest 物件進行回收以便 pullMessageService 能及時喚醒並再次執行訊息拉取

protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
    new ConcurrentHashMap<String, SubscriptionData>();
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    log.warn("find the consumer's subscription failed, {}", pullRequest);
    return;
}
  • 如果為叢集模式,則從記憶體中讀取位置

通過消費者啟動的模組中,我們知道RocketMQ是根據不同模式,將訊息進度儲存在不同的地方

廣播模式:訊息進度儲存在本地檔案

叢集模式:訊息進度儲存在Broker 伺服器上

boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
    // 從記憶體中讀取位置
    commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
    if (commitOffsetValue > 0) {
        commitOffsetEnable = true;
    }
}
  • 核心中拉取訊息(最重要的模組)

入口:org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl

public PullResult pullKernelImpl(
    final MessageQueue mq,
    final String subExpression,
    final String expressionType,
    final long subVersion,
    final long offset,
    final int maxNums,
    final int sysFlag,
    final long commitOffset,
    final long brokerSuspendMaxTimeMillis,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}

我們看到他有非常多的引數

拉取流程

  • 通過BrokerName找到對應的Broker
// step 1 通過BrokerName找到對應的Broker
FindBrokerResult findBrokerResult =
    this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
        this.recalculatePullFromWhichNode(mq), false);
  • 如果沒有找到對應的,則更新路由資訊
// step 2 如果沒有找到對應的,則更新路由資訊
if (null == findBrokerResult) {
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
    findBrokerResult =
        this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
            this.recalculatePullFromWhichNode(mq), false);
}
  • 檢查Broker版本和Tag資訊
// check version
if (!ExpressionType.isTagType(expressionType)
    &amp;&amp; findBrokerResult.getBrokerVersion() &lt; MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
}
  • 設定PullMessageRequestHeader
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
  • 呼叫pullMessage方法拉取訊息,返回拉取結果
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
    brokerAddr,
    requestHeader,
    timeoutMillis,
    communicationMode,
    pullCallback);

因為 CommunicationMode 傳遞的是ASYNC,我們著重來看一下這個方法

入口: org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync

呼叫 this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback()

這裡我們就先不細看了

拉取訊息處理

  • 如果PullCallback回撥成功,則對結果進行處理
// 處理pullResult資料
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
    subscriptionData);

主要做了三件事,轉換訊息格式、設定訊息資訊、放入msgFoundList

將pullResult 轉成 PullResultExt,轉換訊息格式為List

PullResultExt pullResultExt = (PullResultExt) pullResult;
// 轉換訊息格式為List
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);

執行訊息過濾,匹配符合的tag

if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
    msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
    for (MessageExt msg : msgList) {
        if (msg.getTags() != null) {
            if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                msgListFilterAgain.add(msg);
            }
        }
    }
}

設定訊息的transactionId、擴充套件屬性、BrokerName名稱,放入List中

for (MessageExt msg : msgListFilterAgain) {
    String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (Boolean.parseBoolean(traFlag)) {
        msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
    }
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
        Long.toString(pullResult.getMinOffset()));
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
        Long.toString(pullResult.getMaxOffset()));
    msg.setBrokerName(mq.getBrokerName());
}
pullResultExt.setMsgFoundList(msgListFilterAgain);

當pullStatus為FOUND,訊息進行提交消費的請求

  • 獲取第一條訊息的offset(偏移量)
// 獲取第一條訊息的offset
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
  • 將讀取訊息List,更新到processQueue的TreeMap裡面
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());

主要做了兩件事,迴圈讀取訊息list,存入msgTreeMap和計算此次讀取資訊偏移量

public boolean putMessage(final List<MessageExt> msgs) {
    boolean dispatchToConsume = false;
    try {
        // 上鎖
        this.treeMapLock.writeLock().lockInterruptibly();
        try {
            int validMsgCnt = 0;
            // 迴圈讀取訊息list,存入msgTreeMap
            for (MessageExt msg : msgs) {
                MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
                if (null == old) {
                    validMsgCnt++;
                    this.queueOffsetMax = msg.getQueueOffset();
                    msgSize.addAndGet(msg.getBody().length);
                }
            }
            msgCount.addAndGet(validMsgCnt);
            if (!msgTreeMap.isEmpty() && !this.consuming) {
                dispatchToConsume = true;
                this.consuming = true;
            }
            if (!msgs.isEmpty()) {
                // 獲取最後一條訊息
                MessageExt messageExt = msgs.get(msgs.size() - 1);
                // 獲取最大偏移量
                String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
                ...
            }
        } finally {
            this.treeMapLock.writeLock().unlock();
        }
    }
    ...
}
  • 提交消費請求,訊息提交到內部的執行緒池
// 提交消費請求,訊息提交到內部的執行緒池
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);

入口:org.apache.rocketmq.client.impl.consumer.ConsumeMessageService#submitConsumeRequest

獲取 ConsumeRequest物件,拿到當前主題的監聽器

這裡拿到的監聽器,就是我們在啟動消費者的時候所註冊的,監聽到訊息後執行相關的業務邏輯

consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
               ...
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

在這裡觸發我們在一開始重寫的consumeMessage方法,這裡msgs用Collections.unmodifiableList進行包裝,意思就是不可以修改的,是一個唯讀的List

ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
  • ProcessQueue中移除已經處理的訊息,同時更新Offset位置
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
    ) {
        int ackIndex = context.getAckIndex();
        if (consumeRequest.getMsgs().isEmpty())
            return;
        switch (status) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            ...
        }
        switch (this.defaultMQPushConsumer.getMessageModel()) {
            ...
            case CLUSTERING:
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }
                // 如果存在失敗訊息,則過5秒在定時執行
                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);
                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
                ...
        }
        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        // 更新Offset位置  
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore()
            .updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }
  • 最後pullRequest放入pullRequestQueue中

入口:

org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestImmediately

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

訊息消費進度提交

  • 成功消費一條訊息後,更新本地快取表
  • 每5s向Broker提交訊息消費進度
  • Broker每5s將進度持久化到consumerOffset.json

總結

目前只是將整體的一個消費端監聽訊息的流程瞭解清楚,裡面還有許多細節需要去推敲~

以上就是詳解RocketMQ 消費端如何監聽訊息的詳細內容,更多關於RocketMQ 消費端監聽訊息的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com