首頁 > 軟體

RocketMQ訊息拉取過程詳解

2022-12-17 14:00:13

前言

在上一篇文章中,我們講述了DefaultMQPushConsumer拉訊息的原理,它是通過重平衡觸發pullRequest的建立,通過阻塞佇列作為pullRequest的儲存容器,另一端通過定時任務從阻塞佇列中取出pullRequest來向Broker傳送拉訊息的請求,無論訊息拉取成功還是失敗,都會重新把pullRequest放回阻塞佇列中,這樣就能保證持續不斷地向Broker拉訊息了;

今天這篇文章我們繼續講述DefaultLitePullConsumer是如何實現訊息拉取的;

DefaultLitePullConsumer拉訊息程式碼範例

我們在使用DefaultLitePullConsumer時都是主動去poll訊息,並不是像DefaultMQPushConsumer那樣設定一個訊息監聽器:

DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(group);
consumer.setNamesrvAddr(nameSrv);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(topic, subExpression);
try {
    consumer.start();
} catch (Exception e) {
    e.printStackTrace();
}
try {
    while (true) {
        List<MessageExt> messageExts = consumer.poll(5000);
        // 處理業務邏輯
        System.out.println("訊息數量:" + messageExts.size());
        System.out.println("訊息內容:");
        for (MessageExt messageExt : messageExts) {
            System.out.println(new String(messageExt.getBody(), StandardCharsets.UTF_8));
        }
    }
} catch (Exception e) {
    e.printStackTrace();
}

一般拿到訊息後都會交給業務執行緒池去處理,上述程式碼我只簡單地列印了一下訊息內容;

訊息消費

跟著poll()方法,我們最終定位到DefaultLitePullConsumerImpl.poll()這個方法:

    public synchronized List<MessageExt> poll(long timeout) {
        try {
            this.checkServiceState();
            if (timeout < 0L) {
                throw new IllegalArgumentException("Timeout must not be negative");
            }
​
            if (this.defaultLitePullConsumer.isAutoCommit()) {
                this.maybeAutoCommit();
            }
​
            long endTime = System.currentTimeMillis() + timeout;
            // 從阻塞佇列中取ConsumeRequest
            DefaultLitePullConsumerImpl.ConsumeRequest consumeRequest = (DefaultLitePullConsumerImpl.ConsumeRequest)this.consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            if (endTime - System.currentTimeMillis() > 0L) {
                while(consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
                    consumeRequest = (DefaultLitePullConsumerImpl.ConsumeRequest)this.consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                    if (endTime - System.currentTimeMillis() <= 0L) {
                        break;
                    }
                }
            }
​
            if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
                List<MessageExt> messages = consumeRequest.getMessageExts();
                long offset = consumeRequest.getProcessQueue().removeMessage(messages);
              // 取到訊息後直接更新消費點位
                this.assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
                this.resetTopic(messages);
                // 下面是呼叫consumeMessageHook
                if (!this.consumeMessageHookList.isEmpty()) {
                    ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
                    consumeMessageContext.setNamespace(this.defaultLitePullConsumer.getNamespace());
                    consumeMessageContext.setConsumerGroup(this.groupName());
                    consumeMessageContext.setMq(consumeRequest.getMessageQueue());
                    consumeMessageContext.setMsgList(messages);
                    consumeMessageContext.setSuccess(false);
                    this.executeHookBefore(consumeMessageContext);
                  // 預設是消費成功
                    consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
                    consumeMessageContext.setSuccess(true);
                    this.executeHookAfter(consumeMessageContext);
                }
​
                return messages;
            }
        } catch (InterruptedException var10) {
        }
​
        return Collections.emptyList();
    }

1.直接從阻塞佇列consumeRequestCache中取出訊息物件ConsumeRequest,這裡面就包含了訊息內容;

2.取出來後直接更新消費點位,預設為此次訊息消費成功;

這裡跟DefaultMQPushConsumer不同的是,DefaultLitePullConsumerImpl.poll()預設的是訊息消費一定成功,如果消費失敗的話,需要開發人員自己處理,消費失敗的訊息不會再次傳送給消費者;

那麼咱們的疑問就出來了,poll()方法光顧著從consumeRequestCache中取訊息,那訊息是啥時候放進去的呢?

訊息拉取入口

我們可以重新瞭解一下消費者重平衡過程,在MessageQueue分配完畢後,會對比被分配的MessageQueue是否和分配前的不一致,大部分情況下是會發生改變的,那麼就會觸發messageQueueChanged()方法的呼叫:

    @Override
    public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
        // 取出所有的MessageQueueListener
        MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
        if (messageQueueListener != null) {
            try {
                // 依次呼叫messageQueueChanged方法
                messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
            } catch (Throwable e) {
                log.error("messageQueueChanged exception", e);
            }
        }
    }

MessageQueueListener是什麼時候被放進去的呢?可以看一下subscribe()方法:

    public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
        try {
            if (topic == null || "".equals(topic)) {
                throw new IllegalArgumentException("Topic can not be null or empty.");
            }
            setSubscriptionType(SubscriptionType.SUBSCRIBE);
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression);
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            // 每個subscribe()都會設定MessageQueueListener
            this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
            assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
            if (serviceState == ServiceState.RUNNING) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
                updateTopicSubscribeInfoWhenSubscriptionChanged();
            }
        } catch (Exception e) {
            throw new MQClientException("subscribe exception", e);
        }
    }

可以發現,每個subscribe()都會設定MessageQueueListenerMessageQueueListenerImpl裡面只幹了一件事情:更新MessageQueue並且建立pullTask

    class MessageQueueListenerImpl implements MessageQueueListener {
        @Override
        public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
            updateAssignQueueAndStartPullTask(topic, mqAll, mqDivided);
        }
    }
​
    public void updateAssignQueueAndStartPullTask(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
        MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
        switch (messageModel) {
            case BROADCASTING:
                updateAssignedMessageQueue(topic, mqAll);
                // 更新拉訊息任務
                updatePullTask(topic, mqAll);
                break;
            case CLUSTERING:
                updateAssignedMessageQueue(topic, mqDivided);
                // 更新拉訊息任務
                updatePullTask(topic, mqDivided);
                break;
            default:
                break;
        }
    }

現在終於快找到這個訊息拉取的入口了:

    private void startPullTask(Collection<MessageQueue> mqSet) {
        for (MessageQueue messageQueue : mqSet) {
            if (!this.taskTable.containsKey(messageQueue)) {
                // 建立訊息拉取任務
                PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
                this.taskTable.put(messageQueue, pullTask);
                // 這個就是任務執行的入口
                this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
            }
        }
    }

訊息拉取的入口尋找起來還是有點困難的,但是主要思路還是從【重平衡】開始,另外就是觸發了MessageQueueListener,此時才會建立pullTask

雖然DefaultMQPushConsumer也是【重平衡】觸發pullRequest的建立,但是它是將pullRequest放進阻塞佇列,另一端由訊息拉取任務去取pullRequestBroker傳送請求;而DefaultLitePullConsumer是直接建立pullTask去拉訊息;

PullTaskImpl拉訊息

很顯然,PullTaskImpl就是一個Runnable,那麼最重要的就是它的run()方法,這個方法就是負責從Broker拉訊息並放進consumeRequestCache阻塞佇列中,這樣poll()方法才能從consumeRequestCache阻塞佇列中取到訊息;

  • messageQueue暫停
if (DefaultLitePullConsumerImpl.this.assignedMessageQueue.isPaused(this.messageQueue)) {
    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 1000L, TimeUnit.MILLISECONDS);
    DefaultLitePullConsumerImpl.this.log.debug("Message Queue: {} has been paused!", this.messageQueue);
                    return;
}

如果messageQueue處於暫停狀態,那麼延遲1秒重新執行這個任務;

  • ProcessQueue被移除
ProcessQueue processQueue = DefaultLitePullConsumerImpl.this.assignedMessageQueue.getProcessQueue(this.messageQueue);
if (null == processQueue || processQueue.isDropped()) {
    DefaultLitePullConsumerImpl.this.log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
    return;
}

如果processQueue不存在或者已經被移除了,那麼這個任務也不用執行了;

  • 流量控制
if ((long)DefaultLitePullConsumerImpl.this.consumeRequestCache.size() * (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullBatchSize() > DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForAll()) {
    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS);
    if (DefaultLitePullConsumerImpl.this.consumeRequestFlowControlTimes++ % 1000L == 0L) {
        DefaultLitePullConsumerImpl.this.log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", DefaultLitePullConsumerImpl.this.consumeRequestCache.size(), DefaultLitePullConsumerImpl.this.consumeRequestFlowControlTimes);
    }
    return;
}

如果consumeRequestCache中的訊息數量超過了PullThresholdForAll閾值,那麼觸發限流機制,當前任務將不會繼續拉訊息,並且50毫秒後才會重新執行該任務;

long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / 1048576L;
// 單個processQueue上面訊息數量限制
if (cachedMessageCount > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForQueue()) {
    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS);
    if (DefaultLitePullConsumerImpl.this.queueFlowControlTimes++ % 1000L == 0L) {
        DefaultLitePullConsumerImpl.this.log.warn("The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", new Object[]{DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, DefaultLitePullConsumerImpl.this.queueFlowControlTimes});
    }
    return;
}
// 單個processQueue中訊息總大小限制
if (cachedMessageSizeInMiB > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS);
    if (DefaultLitePullConsumerImpl.this.queueFlowControlTimes++ % 1000L == 0L) {
        DefaultLitePullConsumerImpl.this.log.warn("The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", new Object[]{DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, DefaultLitePullConsumerImpl.this.queueFlowControlTimes});
    }
    return;
}
  • 如果當前processQueue中訊息的數量大於PullThresholdForQueue閾值,也同樣觸發限流機制,當前任務不再執行,50毫秒後重新執行該任務;
  • 如果當前processQueue中訊息的總大小超過PullThresholdSizeForQueue(單位:MB)閾值,將觸發限流機制,當前任務不再執行,50毫秒後重新執行該任務;
if (processQueue.getMaxSpan() > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getConsumeMaxSpan()) {
    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS);
    if (DefaultLitePullConsumerImpl.this.queueMaxSpanFlowControlTimes++ % 1000L == 0L) {
        DefaultLitePullConsumerImpl.this.log.warn("The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}", new Object[]{processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), DefaultLitePullConsumerImpl.this.queueMaxSpanFlowControlTimes});
    }
    return;
}

如果processQueue中的maxSpan大於消費者的ConsumeMaxSpan,也就是第一個訊息與最後一個訊息的點位偏差大於ConsumeMaxSpan(預設是2000),將觸發限流機制,當前任務不執行,50毫秒後重新執行該任務;

  • 計算拉取點位
long offset = 0L;
try {
    offset = DefaultLitePullConsumerImpl.this.nextPullOffset(this.messageQueue);
} catch (Exception var17) {
    DefaultLitePullConsumerImpl.this.log.error("Failed to get next pull offset", var17);
    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 3000L, TimeUnit.MILLISECONDS);
    return;
}

計算訊息拉取的點位,如果產生異常,那麼簡隔3秒後再來重新開始任務;

  • 拉訊息
PullResult pullResult = DefaultLitePullConsumerImpl.this.pull(this.messageQueue, subscriptionData, offset, DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullBatchSize());

這個就是發請求給Broker拉訊息;

  • 放進訊息快取區
switch(pullResult.getPullStatus()) {
    case FOUND:
        Object objLock = DefaultLitePullConsumerImpl.this.messageQueueLock.fetchLockObject(this.messageQueue);
        synchronized(objLock) {
            if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && DefaultLitePullConsumerImpl.this.assignedMessageQueue.getSeekOffset(this.messageQueue) == -1L) {
                processQueue.putMessage(pullResult.getMsgFoundList());
                  DefaultLitePullConsumerImpl.this.submitConsumeRequest(DefaultLitePullConsumerImpl.this.new ConsumeRequest(pullResult.getMsgFoundList(), this.messageQueue, processQueue));
     }
     break;
}

找到訊息的情況下,將呼叫submitConsumeRequest()方法把訊息放進阻塞佇列中,等待poll()方法來消費;

  • 重新開啟拉取任務
if (!this.isCancelled()) {
    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
} else {
    DefaultLitePullConsumerImpl.this.log.warn("The Pull Task is cancelled after doPullTask, {}", this.messageQueue);
}

如果當前任務還沒有被取消的話,那麼重新開啟下一個輪迴,準備下一次訊息拉取;

以上就是RocketMQ訊息拉取過程詳解的詳細內容,更多關於RocketMQ訊息拉取過程的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com