<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
上一篇文章中我們主要來看RocketMQ訊息消費者是如何啟動的,
那他有一個步驟是非常重要的,就是啟動訊息的監聽,通過不斷的拉取訊息,來實現訊息的監聽,那具體怎麼做,讓我們我們跟著原始碼來學習一下~
這一塊的程式碼比較多,我自己對關鍵點的一些整理,這個圖我畫的不是很OK
入口:this.pullMessageService.start();
org.apache.rocketmq.client.impl.consumer.PullMessageService#run
宣告一個阻塞佇列用來存放 PullRequest 物件
PullRequest 用於訊息拉取任務,如果 pullRequestQueue 為空則會阻塞,直到拉取任務被放入
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
將 stopped 用volatile來修飾,每次執行的時候都檢測stopped的狀態,執行緒只要修改了這個狀態,其餘執行緒就會馬上知道
protected volatile boolean stopped = false; @Override public void run() { log.info(this.getServiceName() + " service started"); // 判斷啟動狀態 while (!this.isStopped()) { try { // 取出一個PullRequest物件 PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
PullMessageService 從訊息伺服器預設拉取32條訊息,按訊息的偏移量順序存放在 ProcessQueue 佇列
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
入口:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
// 獲取消費佇列快照 final ProcessQueue processQueue = pullRequest.getProcessQueue(); if (processQueue.isDropped()) { log.info("the pull request[{}] is dropped.", pullRequest.toString()); return; } // 設定最後一次拉取時間 pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
// 校驗狀態 this.makeSureStateOK(); private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { throw new MQClientException("The consumer service state not OK, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); } }
如果消費者狀態不正確,則丟擲異常,啟動定時執行緒池過段時間回收 PullRequest 物件,以便pullMessageService能及時喚醒並再次執行訊息拉取,這個邏輯在多個地方使用到了
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { if (!isStopped()) { this.scheduledExecutorService.schedule(new Runnable() { @Override public void run() { PullMessageService.this.executePullRequestImmediately(pullRequest); } }, timeDelay, TimeUnit.MILLISECONDS); } else { log.warn("PullMessageServiceScheduledThread has shutdown"); } }
public void executePullRequestImmediately(final PullRequest pullRequest) { try { // 最後將pullRequest放入pullRequestQueue中 this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); } }
如果觸發流量控制,則延遲拉取訊息,先將 PullRequest 物件進行回收,以便pullMessageService能及時喚醒並再次執行訊息拉取
// 快取訊息條數 long cachedMessageCount = processQueue.getMsgCount().get(); // 快取訊息的大小 long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); // 當佇列中的訊息跳過,超過設定 則延遲拉取訊息 if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; }
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; }
這裡通過查詢 subscriptionInner Map容器,利用主題來獲取對應的訂閱關係,如果沒有找到對應的訂閱關係,則延遲拉取訊息,先將 PullRequest 物件進行回收以便 pullMessageService 能及時喚醒並再次執行訊息拉取
protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>();
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.warn("find the consumer's subscription failed, {}", pullRequest); return; }
通過消費者啟動的模組中,我們知道RocketMQ是根據不同模式,將訊息進度儲存在不同的地方
廣播模式:訊息進度儲存在本地檔案
叢集模式:訊息進度儲存在Broker 伺服器上
boolean commitOffsetEnable = false; long commitOffsetValue = 0L; if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { // 從記憶體中讀取位置 commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); if (commitOffsetValue > 0) { commitOffsetEnable = true; } }
入口:org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl
public PullResult pullKernelImpl( final MessageQueue mq, final String subExpression, final String expressionType, final long subVersion, final long offset, final int maxNums, final int sysFlag, final long commitOffset, final long brokerSuspendMaxTimeMillis, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}
我們看到他有非常多的引數
// step 1 通過BrokerName找到對應的Broker FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false);
// step 2 如果沒有找到對應的,則更新路由資訊 if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); }
// check version if (!ExpressionType.isTagType(expressionType) && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) { throw new MQClientException("The broker[" + mq.getBrokerName() + ", " + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null); }
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType);
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback);
因為 CommunicationMode 傳遞的是ASYNC,我們著重來看一下這個方法
入口: org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync
呼叫 this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback()
這裡我們就先不細看了
// 處理pullResult資料 pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);
主要做了三件事,轉換訊息格式、設定訊息資訊、放入msgFoundList
將pullResult 轉成 PullResultExt,轉換訊息格式為List
PullResultExt pullResultExt = (PullResultExt) pullResult; // 轉換訊息格式為List ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary()); List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
執行訊息過濾,匹配符合的tag
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) { msgListFilterAgain = new ArrayList<MessageExt>(msgList.size()); for (MessageExt msg : msgList) { if (msg.getTags() != null) { if (subscriptionData.getTagsSet().contains(msg.getTags())) { msgListFilterAgain.add(msg); } } } }
設定訊息的transactionId、擴充套件屬性、BrokerName名稱,放入List中
for (MessageExt msg : msgListFilterAgain) { String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (Boolean.parseBoolean(traFlag)) { msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); } MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET, Long.toString(pullResult.getMinOffset())); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET, Long.toString(pullResult.getMaxOffset())); msg.setBrokerName(mq.getBrokerName()); } pullResultExt.setMsgFoundList(msgListFilterAgain);
// 獲取第一條訊息的offset firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
主要做了兩件事,迴圈讀取訊息list,存入msgTreeMap和計算此次讀取資訊偏移量
public boolean putMessage(final List<MessageExt> msgs) { boolean dispatchToConsume = false; try { // 上鎖 this.treeMapLock.writeLock().lockInterruptibly(); try { int validMsgCnt = 0; // 迴圈讀取訊息list,存入msgTreeMap for (MessageExt msg : msgs) { MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg); if (null == old) { validMsgCnt++; this.queueOffsetMax = msg.getQueueOffset(); msgSize.addAndGet(msg.getBody().length); } } msgCount.addAndGet(validMsgCnt); if (!msgTreeMap.isEmpty() && !this.consuming) { dispatchToConsume = true; this.consuming = true; } if (!msgs.isEmpty()) { // 獲取最後一條訊息 MessageExt messageExt = msgs.get(msgs.size() - 1); // 獲取最大偏移量 String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET); ... } } finally { this.treeMapLock.writeLock().unlock(); } } ... }
// 提交消費請求,訊息提交到內部的執行緒池 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);
入口:org.apache.rocketmq.client.impl.consumer.ConsumeMessageService#submitConsumeRequest
獲取 ConsumeRequest物件,拿到當前主題的監聽器
這裡拿到的監聽器,就是我們在啟動消費者的時候所註冊的,監聽到訊息後執行相關的業務邏輯
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ... return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
在這裡觸發我們在一開始重寫的consumeMessage方法,這裡msgs用Collections.unmodifiableList進行包裝,意思就是不可以修改的,是一個唯讀的List
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ) { int ackIndex = context.getAckIndex(); if (consumeRequest.getMsgs().isEmpty()) return; switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; ... } switch (this.defaultMQPushConsumer.getMessageModel()) { ... case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } // 如果存在失敗訊息,則過5秒在定時執行 if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; ... } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); // 更新Offset位置 if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore() .updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
入口:
org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestImmediately
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
目前只是將整體的一個消費端監聽訊息的流程瞭解清楚,裡面還有許多細節需要去推敲~
以上就是詳解RocketMQ 消費端如何監聽訊息的詳細內容,更多關於RocketMQ 消費端監聽訊息的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45