<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
在上一篇文章中,我們講述了DefaultMQPushConsumer
拉訊息的原理,它是通過重平衡觸發pullRequest
的建立,通過阻塞佇列作為pullRequest
的儲存容器,另一端通過定時任務從阻塞佇列中取出pullRequest
來向Broker
傳送拉訊息的請求,無論訊息拉取成功還是失敗,都會重新把pullRequest
放回阻塞佇列中,這樣就能保證持續不斷地向Broker
拉訊息了;
今天這篇文章我們繼續講述DefaultLitePullConsumer
是如何實現訊息拉取的;
我們在使用DefaultLitePullConsumer
時都是主動去poll
訊息,並不是像DefaultMQPushConsumer
那樣設定一個訊息監聽器:
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(group); consumer.setNamesrvAddr(nameSrv); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe(topic, subExpression); try { consumer.start(); } catch (Exception e) { e.printStackTrace(); } try { while (true) { List<MessageExt> messageExts = consumer.poll(5000); // 處理業務邏輯 System.out.println("訊息數量:" + messageExts.size()); System.out.println("訊息內容:"); for (MessageExt messageExt : messageExts) { System.out.println(new String(messageExt.getBody(), StandardCharsets.UTF_8)); } } } catch (Exception e) { e.printStackTrace(); }
一般拿到訊息後都會交給業務執行緒池去處理,上述程式碼我只簡單地列印了一下訊息內容;
跟著poll()
方法,我們最終定位到DefaultLitePullConsumerImpl.poll()
這個方法:
public synchronized List<MessageExt> poll(long timeout) { try { this.checkServiceState(); if (timeout < 0L) { throw new IllegalArgumentException("Timeout must not be negative"); } if (this.defaultLitePullConsumer.isAutoCommit()) { this.maybeAutoCommit(); } long endTime = System.currentTimeMillis() + timeout; // 從阻塞佇列中取ConsumeRequest DefaultLitePullConsumerImpl.ConsumeRequest consumeRequest = (DefaultLitePullConsumerImpl.ConsumeRequest)this.consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); if (endTime - System.currentTimeMillis() > 0L) { while(consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) { consumeRequest = (DefaultLitePullConsumerImpl.ConsumeRequest)this.consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); if (endTime - System.currentTimeMillis() <= 0L) { break; } } } if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) { List<MessageExt> messages = consumeRequest.getMessageExts(); long offset = consumeRequest.getProcessQueue().removeMessage(messages); // 取到訊息後直接更新消費點位 this.assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset); this.resetTopic(messages); // 下面是呼叫consumeMessageHook if (!this.consumeMessageHookList.isEmpty()) { ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setNamespace(this.defaultLitePullConsumer.getNamespace()); consumeMessageContext.setConsumerGroup(this.groupName()); consumeMessageContext.setMq(consumeRequest.getMessageQueue()); consumeMessageContext.setMsgList(messages); consumeMessageContext.setSuccess(false); this.executeHookBefore(consumeMessageContext); // 預設是消費成功 consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString()); consumeMessageContext.setSuccess(true); this.executeHookAfter(consumeMessageContext); } return messages; } } catch (InterruptedException var10) { } return Collections.emptyList(); }
1.直接從阻塞佇列consumeRequestCache
中取出訊息物件ConsumeRequest
,這裡面就包含了訊息內容;
2.取出來後直接更新消費點位,預設為此次訊息消費成功;
這裡跟DefaultMQPushConsumer
不同的是,DefaultLitePullConsumerImpl.poll()
預設的是訊息消費一定成功,如果消費失敗的話,需要開發人員自己處理,消費失敗的訊息不會再次傳送給消費者;
那麼咱們的疑問就出來了,poll()
方法光顧著從consumeRequestCache
中取訊息,那訊息是啥時候放進去的呢?
我們可以重新瞭解一下消費者重平衡過程,在MessageQueue
分配完畢後,會對比被分配的MessageQueue
是否和分配前的不一致,大部分情況下是會發生改變的,那麼就會觸發messageQueueChanged()
方法的呼叫:
@Override public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { // 取出所有的MessageQueueListener MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener(); if (messageQueueListener != null) { try { // 依次呼叫messageQueueChanged方法 messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided); } catch (Throwable e) { log.error("messageQueueChanged exception", e); } } }
MessageQueueListener
是什麼時候被放進去的呢?可以看一下subscribe()
方法:
public synchronized void subscribe(String topic, String subExpression) throws MQClientException { try { if (topic == null || "".equals(topic)) { throw new IllegalArgumentException("Topic can not be null or empty."); } setSubscriptionType(SubscriptionType.SUBSCRIBE); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); // 每個subscribe()都會設定MessageQueueListener this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl()); assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl); if (serviceState == ServiceState.RUNNING) { this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); updateTopicSubscribeInfoWhenSubscriptionChanged(); } } catch (Exception e) { throw new MQClientException("subscribe exception", e); } }
可以發現,每個subscribe()都會設定MessageQueueListener
,MessageQueueListenerImpl
裡面只幹了一件事情:更新MessageQueue
並且建立pullTask
;
class MessageQueueListenerImpl implements MessageQueueListener { @Override public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { updateAssignQueueAndStartPullTask(topic, mqAll, mqDivided); } } public void updateAssignQueueAndStartPullTask(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { MessageModel messageModel = defaultLitePullConsumer.getMessageModel(); switch (messageModel) { case BROADCASTING: updateAssignedMessageQueue(topic, mqAll); // 更新拉訊息任務 updatePullTask(topic, mqAll); break; case CLUSTERING: updateAssignedMessageQueue(topic, mqDivided); // 更新拉訊息任務 updatePullTask(topic, mqDivided); break; default: break; } }
現在終於快找到這個訊息拉取的入口了:
private void startPullTask(Collection<MessageQueue> mqSet) { for (MessageQueue messageQueue : mqSet) { if (!this.taskTable.containsKey(messageQueue)) { // 建立訊息拉取任務 PullTaskImpl pullTask = new PullTaskImpl(messageQueue); this.taskTable.put(messageQueue, pullTask); // 這個就是任務執行的入口 this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS); } } }
訊息拉取的入口尋找起來還是有點困難的,但是主要思路還是從【重平衡】開始,另外就是觸發了MessageQueueListener
,此時才會建立pullTask
;
雖然DefaultMQPushConsumer
也是【重平衡】觸發pullRequest
的建立,但是它是將pullRequest
放進阻塞佇列,另一端由訊息拉取任務去取pullRequest
向Broker
傳送請求;而DefaultLitePullConsumer
是直接建立pullTask
去拉訊息;
很顯然,PullTaskImpl
就是一個Runnable
,那麼最重要的就是它的run()
方法,這個方法就是負責從Broker
拉訊息並放進consumeRequestCache
阻塞佇列中,這樣poll()
方法才能從consumeRequestCache
阻塞佇列中取到訊息;
messageQueue
暫停if (DefaultLitePullConsumerImpl.this.assignedMessageQueue.isPaused(this.messageQueue)) { DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 1000L, TimeUnit.MILLISECONDS); DefaultLitePullConsumerImpl.this.log.debug("Message Queue: {} has been paused!", this.messageQueue); return; }
如果messageQueue
處於暫停狀態,那麼延遲1秒重新執行這個任務;
ProcessQueue
被移除ProcessQueue processQueue = DefaultLitePullConsumerImpl.this.assignedMessageQueue.getProcessQueue(this.messageQueue); if (null == processQueue || processQueue.isDropped()) { DefaultLitePullConsumerImpl.this.log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getConsumerGroup(), this.messageQueue); return; }
如果processQueue
不存在或者已經被移除了,那麼這個任務也不用執行了;
if ((long)DefaultLitePullConsumerImpl.this.consumeRequestCache.size() * (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullBatchSize() > DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForAll()) { DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS); if (DefaultLitePullConsumerImpl.this.consumeRequestFlowControlTimes++ % 1000L == 0L) { DefaultLitePullConsumerImpl.this.log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", DefaultLitePullConsumerImpl.this.consumeRequestCache.size(), DefaultLitePullConsumerImpl.this.consumeRequestFlowControlTimes); } return; }
如果consumeRequestCache
中的訊息數量超過了PullThresholdForAll
閾值,那麼觸發限流機制,當前任務將不會繼續拉訊息,並且50毫秒後才會重新執行該任務;
long cachedMessageCount = processQueue.getMsgCount().get(); long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / 1048576L; // 單個processQueue上面訊息數量限制 if (cachedMessageCount > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForQueue()) { DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS); if (DefaultLitePullConsumerImpl.this.queueFlowControlTimes++ % 1000L == 0L) { DefaultLitePullConsumerImpl.this.log.warn("The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", new Object[]{DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, DefaultLitePullConsumerImpl.this.queueFlowControlTimes}); } return; } // 單個processQueue中訊息總大小限制 if (cachedMessageSizeInMiB > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdSizeForQueue()) { DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS); if (DefaultLitePullConsumerImpl.this.queueFlowControlTimes++ % 1000L == 0L) { DefaultLitePullConsumerImpl.this.log.warn("The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", new Object[]{DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, DefaultLitePullConsumerImpl.this.queueFlowControlTimes}); } return; }
processQueue
中訊息的數量大於PullThresholdForQueue
閾值,也同樣觸發限流機制,當前任務不再執行,50毫秒後重新執行該任務;processQueue
中訊息的總大小超過PullThresholdSizeForQueue
(單位:MB
)閾值,將觸發限流機制,當前任務不再執行,50毫秒後重新執行該任務;if (processQueue.getMaxSpan() > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getConsumeMaxSpan()) { DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS); if (DefaultLitePullConsumerImpl.this.queueMaxSpanFlowControlTimes++ % 1000L == 0L) { DefaultLitePullConsumerImpl.this.log.warn("The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}", new Object[]{processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), DefaultLitePullConsumerImpl.this.queueMaxSpanFlowControlTimes}); } return; }
如果processQueue
中的maxSpan
大於消費者的ConsumeMaxSpan
,也就是第一個訊息與最後一個訊息的點位偏差大於ConsumeMaxSpan
(預設是2000),將觸發限流機制,當前任務不執行,50毫秒後重新執行該任務;
long offset = 0L; try { offset = DefaultLitePullConsumerImpl.this.nextPullOffset(this.messageQueue); } catch (Exception var17) { DefaultLitePullConsumerImpl.this.log.error("Failed to get next pull offset", var17); DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 3000L, TimeUnit.MILLISECONDS); return; }
計算訊息拉取的點位,如果產生異常,那麼簡隔3秒後再來重新開始任務;
PullResult pullResult = DefaultLitePullConsumerImpl.this.pull(this.messageQueue, subscriptionData, offset, DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullBatchSize());
這個就是發請求給Broker
拉訊息;
switch(pullResult.getPullStatus()) { case FOUND: Object objLock = DefaultLitePullConsumerImpl.this.messageQueueLock.fetchLockObject(this.messageQueue); synchronized(objLock) { if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && DefaultLitePullConsumerImpl.this.assignedMessageQueue.getSeekOffset(this.messageQueue) == -1L) { processQueue.putMessage(pullResult.getMsgFoundList()); DefaultLitePullConsumerImpl.this.submitConsumeRequest(DefaultLitePullConsumerImpl.this.new ConsumeRequest(pullResult.getMsgFoundList(), this.messageQueue, processQueue)); } break; }
找到訊息的情況下,將呼叫submitConsumeRequest()
方法把訊息放進阻塞佇列中,等待poll()
方法來消費;
if (!this.isCancelled()) { DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS); } else { DefaultLitePullConsumerImpl.this.log.warn("The Pull Task is cancelled after doPullTask, {}", this.messageQueue); }
如果當前任務還沒有被取消的話,那麼重新開啟下一個輪迴,準備下一次訊息拉取;
以上就是RocketMQ訊息拉取過程詳解的詳細內容,更多關於RocketMQ訊息拉取過程的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45