首頁 > 軟體

RocketMQ broker 訊息投遞流程處理PULL_MESSAGE請求解析

2023-04-04 06:00:29

RocketMq訊息處理

RocketMq訊息處理整個流程如下:

本系列RocketMQ4.8註釋github地址,希望對大家有所幫助,要是覺得可以的話麻煩給點一下Star哈

  • 訊息接收:訊息接收是指接收producer的訊息,處理類是SendMessageProcessor,將訊息寫入到commigLog檔案後,接收流程處理完畢;
  • 訊息分發:broker處理訊息分發的類是ReputMessageService,它會啟動一個執行緒,不斷地將commitLong分到到對應的consumerQueue,這一步操作會寫兩個檔案:consumerQueueindexFile,寫入後,訊息分發流程處理 完畢;
  • 訊息投遞:訊息投遞是指將訊息發往consumer的流程,consumer會發起獲取訊息的請求,broker收到請求後,呼叫PullMessageProcessor類處理,從consumerQueue檔案獲取訊息,返回給consumer後,投遞流程處理完畢。

以上就是rocketMq處理訊息的流程了,接下來我們就從原始碼來分析訊息投遞的實現。

1. 處理PULL_MESSAGE請求

producer不同,consumerbroker拉取訊息時,傳送的請求codePULL_MESSAGEprocessorPullMessageProcessor,我們直接進入它的processRequest方法:

@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
    // 呼叫方法
    return this.processRequest(ctx.channel(), request, true);
}

這個方法就只是呼叫了一個過載方法,多出來的引數true表示允許broker掛起請求,我們繼續,

/**
 * 繼續處理
 */
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, 
        boolean brokerAllowSuspend)throws RemotingCommandException {
    RemotingCommand response = RemotingCommand
        .createResponseCommand(PullMessageResponseHeader.class);
    final PullMessageResponseHeader responseHeader 
        = (PullMessageResponseHeader) response.readCustomHeader();
    final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) 
        request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
    response.setOpaque(request.getOpaque());
    // 省略許可權校驗流程
    // 1. rocketMq 可以設定校驗資訊,以阻擋非法使用者端的連線
    // 2. 同時,對topic可以設定DENY(拒絕)、ANY(PUB 或者 SUB 許可權)、PUB(傳送許可權)、SUB(訂閱許可權)等許可權,
    //    可以細粒度控制使用者端對topic的操作內容
    ...
    // 獲取訂閱組
    SubscriptionGroupConfig subscriptionGroupConfig =
        this.brokerController.getSubscriptionGroupManager()
        .findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
    ...
    // 獲取訂閱主題
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager()
        .selectTopicConfig(requestHeader.getTopic());
    ...
    // 處理filter
    // consumer在訂閱訊息時,可以對訂閱的訊息進行過濾,過濾方法有兩種:tag與sql92
    // 這裡我們重點關注拉取訊息的流程,具體的過濾細節後面再分析
    ...
    // 獲取訊息
    // 1. 根據 topic 與 queueId 獲取 ConsumerQueue 檔案
    // 2. 根據 ConsumerQueue 檔案的資訊,從 CommitLog 中獲取訊息內容
    final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(
        requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), 
        requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
    if (getMessageResult != null) {
        // 省略一大堆的校驗過程
        ...
        switch (response.getCode()) {
            // 表示訊息可以處理,這裡會把訊息內容寫入到 response 中
            case ResponseCode.SUCCESS:
                ...
                // 處理訊息訊息內容,就是把訊息從 getMessageResult 讀出來,放到 response 中
                if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
                    final long beginTimeMills = this.brokerController.getMessageStore().now();
                    // 將訊息內容轉為byte陣列
                    final byte[] r = this.readGetMessageResult(getMessageResult, 
                        requestHeader.getConsumerGroup(), requestHeader.getTopic(), 
                        requestHeader.getQueueId());
                    ...
                    response.setBody(r);
                } else {
                    try {
                        // 訊息轉換
                        FileRegion fileRegion = new ManyMessageTransfer(response.encodeHeader(
                            getMessageResult.getBufferTotalSize()), getMessageResult);
                        channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
                            ...
                        });
                    } catch (Throwable e) {
                        ...
                    }
                    response = null;
                }
                break;
            // 未找到滿足條件的訊息
            case ResponseCode.PULL_NOT_FOUND:
                // 如果支援掛起,就掛起當前請求
                if (brokerAllowSuspend && hasSuspendFlag) {
                    ...
                    PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                        this.brokerController.getMessageStore().now(), offset, subscriptionData, 
                        messageFilter);
                    // 沒有找到相關的訊息,掛起操作
                    this.brokerController.getPullRequestHoldService()
                        .suspendPullRequest(topic, queueId, pullRequest);
                    response = null;
                    break;
                }
            // 省略其他型別的處理
            ...
                break;
            default:
                assert false;
        }
    } else {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("store getMessage return null");
    }
    ...
    return response;
}

在原始碼中,這個方法也是非常長,這裡我抹去了各種細枝末節,僅留下了一些重要的流程,整個處理流程如下:

  • 許可權校驗:rocketMq 可以設定校驗資訊,以阻擋非法使用者端的連線,同時也可以設定使用者端的釋出、訂閱許可權,細節度控制存取許可權;
  • 獲取訂閱組、訂閱主題等,這塊主要是通過請求訊息裡的內容獲取broker中對應的記錄
  • 建立過濾元件:consumer在訂閱訊息時,可以對訂閱的訊息進行過濾,過濾方法有兩種:tagsql92
  • 獲取訊息:先是根據 topicqueueId 獲取 ConsumerQueue 檔案,根據 ConsumerQueue 檔案的資訊,從 CommitLog 中獲取訊息內容,訊息的過濾操作也是發生在這一步
  • 轉換訊息:如果獲得了訊息,就是把具體的訊息內容,複製到reponse
  • 掛起請求:如果沒獲得訊息,而當前請求又支援掛起,就掛起當前請求

以上程式碼還是比較清晰的,相關流程程式碼中都作了註釋。

以上流程就是整個訊息的獲取流程了,在本文中,我們僅關注與獲取訊息相關的步驟,重點關注以下兩個操作:

  • 獲取訊息
  • 掛起請求

2. 獲取訊息

獲取訊息的方法為DefaultMessageStore#getMessage,程式碼如下:

public GetMessageResult getMessage(final String group, final String topic, final int queueId, 
        final long offset, final int maxMsgNums, final MessageFilter messageFilter) {
    // 省略一些判斷
    ...
    // 根據topic與queueId一個ConsumeQueue,consumeQueue記錄的是訊息在commitLog的位置
    ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
    if (consumeQueue != null) {
        minOffset = consumeQueue.getMinOffsetInQueue();
        maxOffset = consumeQueue.getMaxOffsetInQueue();
        if (...) {
            // 判斷 offset 是否符合要求
            ...
        } else {
            // 從 consumerQueue 檔案中獲取訊息
            SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
            if (bufferConsumeQueue != null) {
                ...
                for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; 
                    i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    // 省略一大堆的訊息過濾操作
                    ...
                    // 從 commitLong 獲取訊息
                    SelectMappedBufferResult selectResult 
                            = this.commitLog.getMessage(offsetPy, sizePy);
                    if (null == selectResult) {
                        if (getResult.getBufferTotalSize() == 0) {
                            status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                        }
                        nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                        continue;
                    }
                    // 省略一大堆的訊息過濾操作
                    ...
                }
            }
    } else {
        status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
        nextBeginOffset = nextOffsetCorrection(offset, 0);
    }
    if (GetMessageStatus.FOUND == status) {
        this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
    } else {
        this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
    }
    long elapsedTime = this.getSystemClock().now() - beginTime;
    this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);
    getResult.setStatus(status);
    // 又是處理 offset
    getResult.setNextBeginOffset(nextBeginOffset);
    getResult.setMaxOffset(maxOffset);
    getResult.setMinOffset(minOffset);
    return getResult;
}

這個方法不是比較長的,這裡僅保留了關鍵流程,獲取訊息的關鍵流程如下:

  • 根據topicqueueId找到ConsumerQueue
  • ConsumerQueue對應的檔案中獲取訊息資訊,如taghashCode、訊息在commitLog中的位置資訊
  • 根據位置資訊,從commitLog中獲取完整的訊息

經過以上步驟,訊息就能獲取到了,不過在獲取訊息的前後,會進行訊息過濾操作,即根據tagsql語法來過濾訊息,關於訊息過濾的一些細節,我們留到後面訊息過濾相關章節作進一步分析。

3. 掛起請求:PullRequestHoldService#suspendPullRequest

broker無新訊息時,consumer拉取訊息的請求就會掛起,方法為PullRequestHoldService#suspendPullRequest

public class PullRequestHoldService extends ServiceThread {
    private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
        new ConcurrentHashMap<String, ManyPullRequest>(1024);
    public void suspendPullRequest(final String topic, final int queueId, 
            final PullRequest pullRequest) {
        String key = this.buildKey(topic, queueId);
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (null == mpr) {
            mpr = new ManyPullRequest();
            ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
            if (prev != null) {
                mpr = prev;
            }
        }
        mpr.addPullRequest(pullRequest);
    }
    ...
}

suspendPullRequest方法中,所做的工作僅是把當前請求放入pullRequestTable中了。從程式碼中可以看到,pullRequestTable是一個ConcurrentMapkeytopic@queueIdvalue 就是掛起的請求了。

請求掛起後,何時處理呢?這就是PullRequestHoldService執行緒的工作了。

3.1 處理掛起請求的執行緒:PullRequestHoldService

看完PullRequestHoldService#suspendPullRequest方法後,我們再來看看PullRequestHoldService

PullRequestHoldServiceServiceThread的子類(上一次看到ServiceThread的子類還是ReputMessageService),它也會啟動一個新執行緒來處理掛起操作。

我們先來看看它是在哪裡啟動PullRequestHoldService的執行緒的,在BrokerController的啟動方法start()中有這麼一行:

BrokerController#start

public void start() throws Exception {
    ...
    if (this.pullRequestHoldService != null) {
        this.pullRequestHoldService.start();
    }
    ...
}

這裡就是啟動pullRequestHoldService的執行緒操作了。

為了探究這個執行緒做了什麼,我們進入PullRequestHoldService#run方法:

@Override
public void run() {
    log.info("{} service started", this.getServiceName());
    while (!this.isStopped()) {
        try {
            // 等待中
            if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                this.waitForRunning(5 * 1000);
            } else {
                this.waitForRunning(
                    this.brokerController.getBrokerConfig().getShortPollingTimeMills());
            }
            long beginLockTimestamp = this.systemClock.now();
            // 檢查操作
            this.checkHoldRequest();
            long costTime = this.systemClock.now() - beginLockTimestamp;
            if (costTime > 5 * 1000) {
                log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
            }
        } catch (Throwable e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
    log.info("{} service end", this.getServiceName());
}

從程式碼來看,這個執行緒先是進行等待,然後呼叫PullRequestHoldService#checkHoldRequest方法,看來關注就是這個方法了,它的程式碼如下:

private void checkHoldRequest() {
    for (String key : this.pullRequestTable.keySet()) {
        String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
        if (2 == kArray.length) {
            String topic = kArray[0];
            int queueId = Integer.parseInt(kArray[1]);
            final long offset = this.brokerController.getMessageStore()
                .getMaxOffsetInQueue(topic, queueId);
            try {
                // 呼叫notifyMessageArriving方法操作
                this.notifyMessageArriving(topic, queueId, offset);
            } catch (Throwable e) {
                log.error(...);
            }
        }
    }
}

這個方法呼叫了PullRequestHoldService#notifyMessageArriving(...),我們繼續進入:

public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
    // 繼續呼叫
    notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);
}
/**
 * 這個方法就是最終呼叫的了
 */
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, 
    final Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
    String key = this.buildKey(topic, queueId);
    ManyPullRequest mpr = this.pullRequestTable.get(key);
    if (mpr != null) {
        List<PullRequest> requestList = mpr.cloneListAndClear();
        if (requestList != null) {
            List<PullRequest> replayList = new ArrayList<PullRequest>();
            for (PullRequest request : requestList) {
                // 判斷是否有新訊息到達,要根據 comsumerQueue 的偏移量與request的偏移量判斷
                long newestOffset = maxOffset;
                if (newestOffset <= request.getPullFromThisOffset()) {
                    newestOffset = this.brokerController.getMessageStore()
                        .getMaxOffsetInQueue(topic, queueId);
                }
                if (newestOffset > request.getPullFromThisOffset()) {
                    boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                        new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
                    if (match && properties != null) {
                        match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
                    }
                    if (match) {
                        try {
                            // 喚醒操作
                            this.brokerController.getPullMessageProcessor()
                                .executeRequestWhenWakeup(request.getClientChannel(),
                                request.getRequestCommand());
                        } catch (Throwable e) {
                            log.error("execute request when wakeup failed.", e);
                        }
                        continue;
                    }
                }
                // 超時時間到了
                if (System.currentTimeMillis() >= 
                        (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                    try {
                        // 喚醒操作
                        this.brokerController.getPullMessageProcessor()
                            .executeRequestWhenWakeup(request.getClientChannel(),
                            request.getRequestCommand());
                    } catch (Throwable e) {
                        log.error("execute request when wakeup failed.", e);
                    }
                    continue;
                }
                replayList.add(request);
            }
            if (!replayList.isEmpty()) {
                mpr.addPullRequest(replayList);
            }
        }
    }
}

這個方法就是用來檢查是否有新訊息送達的操作了,方法雖然有點長,但可以用一句話來總結:如果有新訊息送達,或者pullRquest hold住的時間到了,就喚醒pullRquest(即呼叫PullMessageProcessor#executeRequestWhenWakeup方法)。

  • 在判斷是否有新訊息送達時,會獲取comsumerQueue檔案中的最大偏移量,與當前pullRquest中的偏移量進行比較,如果前者大,就表示有新訊息送達了,需要喚醒pullRquest
  • 前面說過,當consumer請求沒獲取到訊息時,brokerhold這個請求一段時間(30s),當這個時間到了,也會喚醒pullRquest,之後就不會再hold住它了

3.2 喚醒請求:PullMessageProcessor#executeRequestWhenWakeup

我們再來看看 PullMessageProcessor#executeRequestWhenWakeup 方法:

public void executeRequestWhenWakeup(final Channel channel,
    final RemotingCommand request) throws RemotingCommandException {
    // 關注 Runnable#run() 方法即可
    Runnable run = new Runnable() {
        @Override
        public void run() {
            try {
                // 再一次呼叫 PullMessageProcessor#processRequest(...) 方法
                final RemotingCommand response = PullMessageProcessor.this
                    .processRequest(channel, request, false);
                ...
            } catch (RemotingCommandException e1) {
                log.error("excuteRequestWhenWakeup run", e1);
            }
        }
    };
    // 提交任務
    this.brokerController.getPullMessageExecutor()
        .submit(new RequestTask(run, channel, request));
}

這個方法準備了一個任務,然後將其提交到執行緒池中執行,任務內容很簡單,僅是呼叫了PullMessageProcessor#processRequest(...) 方法,這個方法就是本節一始提到的處理consumer拉取訊息的方法了。

3.3 訊息分發中喚醒consumer請求

在分析訊息分發流程時,DefaultMessageStore.ReputMessageService#doReput方法中有這麼一段:

private void doReput() {
    ...
    // 分發訊息
    DefaultMessageStore.this.doDispatch(dispatchRequest);
    // 長輪詢:如果有訊息到了主節點,並且開啟了長輪詢
    if (BrokerRole.SLAVE != DefaultMessageStore.this
            .getMessageStoreConfig().getBrokerRole()
            &&DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){
        // 呼叫NotifyMessageArrivingListener的arriving方法
        DefaultMessageStore.this.messageArrivingListener.arriving(
            dispatchRequest.getTopic(),
            dispatchRequest.getQueueId(), 
            dispatchRequest.getConsumeQueueOffset() + 1,
            dispatchRequest.getTagsCode(), 
            dispatchRequest.getStoreTimestamp(),
            dispatchRequest.getBitMap(), 
            dispatchRequest.getPropertiesMap());
    }
    ...
}

這段就是用來主動喚醒hold住的consumer請求的,我們進入NotifyMessageArrivingListener#arriving方法:

 @Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
    long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
    this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
        msgStoreTime, filterBitMap, properties);
}

最終它也是呼叫了 PullRequestHoldService#notifyMessageArriving(...) 方法。

總結

本文主要分析了broker處理PULL_MESSAGE請求的流程,總結如下:

  • broker處理PULL_MESSAGEprocessorPullMessageProcessorPullMessageProcessorprocessRequest(...)就是整個訊息獲取流程了
  • broker在獲取訊息時,先根據請求的topicqueueId找到consumerQueue,然後根據請求中的offset引數從consumerQueue檔案中找到訊息在commitLog的位置資訊,最後根據位置資訊從commitLog中獲取訊息內容
  • 如果broker中沒有當前consumerQueue的訊息,broker會掛起當前執行緒,直到超時(預設30s)或收到新的訊息時再喚醒

參考  

RocketMQ原始碼分析專欄

以上就是RocketMQ broker 訊息投遞流程處理PULL_MESSAGE請求解析的詳細內容,更多關於RocketMQ broker 訊息投遞的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com