<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
RocketMq
訊息處理整個流程如下:
本系列RocketMQ4.8註釋github地址,希望對大家有所幫助,要是覺得可以的話麻煩給點一下Star哈
producer
的訊息,處理類是SendMessageProcessor
,將訊息寫入到commigLog
檔案後,接收流程處理完畢;broker
處理訊息分發的類是ReputMessageService
,它會啟動一個執行緒,不斷地將commitLong
分到到對應的consumerQueue
,這一步操作會寫兩個檔案:consumerQueue
與indexFile
,寫入後,訊息分發流程處理 完畢;consumer
的流程,consumer
會發起獲取訊息的請求,broker
收到請求後,呼叫PullMessageProcessor
類處理,從consumerQueue
檔案獲取訊息,返回給consumer
後,投遞流程處理完畢。以上就是rocketMq
處理訊息的流程了,接下來我們就從原始碼來分析訊息投遞的實現。
與producer
不同,consumer
從broker
拉取訊息時,傳送的請求code
為PULL_MESSAGE
,processor
為PullMessageProcessor
,我們直接進入它的processRequest
方法:
@Override public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { // 呼叫方法 return this.processRequest(ctx.channel(), request, true); }
這個方法就只是呼叫了一個過載方法,多出來的引數true
表示允許broker
掛起請求,我們繼續,
/** * 繼續處理 */ private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)throws RemotingCommandException { RemotingCommand response = RemotingCommand .createResponseCommand(PullMessageResponseHeader.class); final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); response.setOpaque(request.getOpaque()); // 省略許可權校驗流程 // 1. rocketMq 可以設定校驗資訊,以阻擋非法使用者端的連線 // 2. 同時,對topic可以設定DENY(拒絕)、ANY(PUB 或者 SUB 許可權)、PUB(傳送許可權)、SUB(訂閱許可權)等許可權, // 可以細粒度控制使用者端對topic的操作內容 ... // 獲取訂閱組 SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager() .findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); ... // 獲取訂閱主題 TopicConfig topicConfig = this.brokerController.getTopicConfigManager() .selectTopicConfig(requestHeader.getTopic()); ... // 處理filter // consumer在訂閱訊息時,可以對訂閱的訊息進行過濾,過濾方法有兩種:tag與sql92 // 這裡我們重點關注拉取訊息的流程,具體的過濾細節後面再分析 ... // 獲取訊息 // 1. 根據 topic 與 queueId 獲取 ConsumerQueue 檔案 // 2. 根據 ConsumerQueue 檔案的資訊,從 CommitLog 中獲取訊息內容 final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage( requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter); if (getMessageResult != null) { // 省略一大堆的校驗過程 ... switch (response.getCode()) { // 表示訊息可以處理,這裡會把訊息內容寫入到 response 中 case ResponseCode.SUCCESS: ... // 處理訊息訊息內容,就是把訊息從 getMessageResult 讀出來,放到 response 中 if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { final long beginTimeMills = this.brokerController.getMessageStore().now(); // 將訊息內容轉為byte陣列 final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); ... response.setBody(r); } else { try { // 訊息轉換 FileRegion fileRegion = new ManyMessageTransfer(response.encodeHeader( getMessageResult.getBufferTotalSize()), getMessageResult); channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { ... }); } catch (Throwable e) { ... } response = null; } break; // 未找到滿足條件的訊息 case ResponseCode.PULL_NOT_FOUND: // 如果支援掛起,就掛起當前請求 if (brokerAllowSuspend && hasSuspendFlag) { ... PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); // 沒有找到相關的訊息,掛起操作 this.brokerController.getPullRequestHoldService() .suspendPullRequest(topic, queueId, pullRequest); response = null; break; } // 省略其他型別的處理 ... break; default: assert false; } } else { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("store getMessage return null"); } ... return response; }
在原始碼中,這個方法也是非常長,這裡我抹去了各種細枝末節,僅留下了一些重要的流程,整個處理流程如下:
rocketMq
可以設定校驗資訊,以阻擋非法使用者端的連線,同時也可以設定使用者端的釋出、訂閱許可權,細節度控制存取許可權;broker
中對應的記錄consumer
在訂閱訊息時,可以對訂閱的訊息進行過濾,過濾方法有兩種:tag
與sql92
topic
與 queueId
獲取 ConsumerQueue
檔案,根據 ConsumerQueue
檔案的資訊,從 CommitLog
中獲取訊息內容,訊息的過濾操作也是發生在這一步reponse
中以上程式碼還是比較清晰的,相關流程程式碼中都作了註釋。
以上流程就是整個訊息的獲取流程了,在本文中,我們僅關注與獲取訊息相關的步驟,重點關注以下兩個操作:
獲取訊息的方法為DefaultMessageStore#getMessage
,程式碼如下:
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final MessageFilter messageFilter) { // 省略一些判斷 ... // 根據topic與queueId一個ConsumeQueue,consumeQueue記錄的是訊息在commitLog的位置 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) { minOffset = consumeQueue.getMinOffsetInQueue(); maxOffset = consumeQueue.getMaxOffsetInQueue(); if (...) { // 判斷 offset 是否符合要求 ... } else { // 從 consumerQueue 檔案中獲取訊息 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); if (bufferConsumeQueue != null) { ... for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 省略一大堆的訊息過濾操作 ... // 從 commitLong 獲取訊息 SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); if (null == selectResult) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.MESSAGE_WAS_REMOVING; } nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); continue; } // 省略一大堆的訊息過濾操作 ... } } } else { status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0); } if (GetMessageStatus.FOUND == status) { this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet(); } else { this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet(); } long elapsedTime = this.getSystemClock().now() - beginTime; this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime); getResult.setStatus(status); // 又是處理 offset getResult.setNextBeginOffset(nextBeginOffset); getResult.setMaxOffset(maxOffset); getResult.setMinOffset(minOffset); return getResult; }
這個方法不是比較長的,這裡僅保留了關鍵流程,獲取訊息的關鍵流程如下:
topic
與queueId
找到ConsumerQueue
ConsumerQueue
對應的檔案中獲取訊息資訊,如tag
的hashCode
、訊息在commitLog
中的位置資訊commitLog
中獲取完整的訊息經過以上步驟,訊息就能獲取到了,不過在獲取訊息的前後,會進行訊息過濾操作,即根據tag
或sql
語法來過濾訊息,關於訊息過濾的一些細節,我們留到後面訊息過濾相關章節作進一步分析。
當broker
無新訊息時,consumer
拉取訊息的請求就會掛起,方法為PullRequestHoldService#suspendPullRequest
:
public class PullRequestHoldService extends ServiceThread { private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable = new ConcurrentHashMap<String, ManyPullRequest>(1024); public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) { String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (null == mpr) { mpr = new ManyPullRequest(); ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr); if (prev != null) { mpr = prev; } } mpr.addPullRequest(pullRequest); } ... }
在suspendPullRequest
方法中,所做的工作僅是把當前請求放入pullRequestTable
中了。從程式碼中可以看到,pullRequestTable
是一個ConcurrentMap
,key
是 topic@queueId
,value
就是掛起的請求了。
請求掛起後,何時處理呢?這就是PullRequestHoldService
執行緒的工作了。
看完PullRequestHoldService#suspendPullRequest
方法後,我們再來看看PullRequestHoldService
。
PullRequestHoldService
是ServiceThread
的子類(上一次看到ServiceThread
的子類還是ReputMessageService
),它也會啟動一個新執行緒來處理掛起操作。
我們先來看看它是在哪裡啟動PullRequestHoldService
的執行緒的,在BrokerController
的啟動方法start()
中有這麼一行:
BrokerController#start
public void start() throws Exception { ... if (this.pullRequestHoldService != null) { this.pullRequestHoldService.start(); } ... }
這裡就是啟動pullRequestHoldService
的執行緒操作了。
為了探究這個執行緒做了什麼,我們進入PullRequestHoldService#run
方法:
@Override public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { // 等待中 if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); } else { this.waitForRunning( this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this.systemClock.now(); // 檢查操作 this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } } catch (Throwable e) { log.warn(this.getServiceName() + " service has exception. ", e); } } log.info("{} service end", this.getServiceName()); }
從程式碼來看,這個執行緒先是進行等待,然後呼叫PullRequestHoldService#checkHoldRequest
方法,看來關注就是這個方法了,它的程式碼如下:
private void checkHoldRequest() { for (String key : this.pullRequestTable.keySet()) { String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR); if (2 == kArray.length) { String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]); final long offset = this.brokerController.getMessageStore() .getMaxOffsetInQueue(topic, queueId); try { // 呼叫notifyMessageArriving方法操作 this.notifyMessageArriving(topic, queueId, offset); } catch (Throwable e) { log.error(...); } } } }
這個方法呼叫了PullRequestHoldService#notifyMessageArriving(...)
,我們繼續進入:
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) { // 繼續呼叫 notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null); } /** * 這個方法就是最終呼叫的了 */ public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (mpr != null) { List<PullRequest> requestList = mpr.cloneListAndClear(); if (requestList != null) { List<PullRequest> replayList = new ArrayList<PullRequest>(); for (PullRequest request : requestList) { // 判斷是否有新訊息到達,要根據 comsumerQueue 的偏移量與request的偏移量判斷 long newestOffset = maxOffset; if (newestOffset <= request.getPullFromThisOffset()) { newestOffset = this.brokerController.getMessageStore() .getMaxOffsetInQueue(topic, queueId); } if (newestOffset > request.getPullFromThisOffset()) { boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap)); if (match && properties != null) { match = request.getMessageFilter().isMatchedByCommitLog(null, properties); } if (match) { try { // 喚醒操作 this.brokerController.getPullMessageProcessor() .executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } continue; } } // 超時時間到了 if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { try { // 喚醒操作 this.brokerController.getPullMessageProcessor() .executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } continue; } replayList.add(request); } if (!replayList.isEmpty()) { mpr.addPullRequest(replayList); } } } }
這個方法就是用來檢查是否有新訊息送達的操作了,方法雖然有點長,但可以用一句話來總結:如果有新訊息送達,或者pullRquest
hold
住的時間到了,就喚醒pullRquest
(即呼叫PullMessageProcessor#executeRequestWhenWakeup
方法)。
comsumerQueue
檔案中的最大偏移量,與當前pullRquest
中的偏移量進行比較,如果前者大,就表示有新訊息送達了,需要喚醒pullRquest
consumer
請求沒獲取到訊息時,broker
會hold
這個請求一段時間(30s),當這個時間到了,也會喚醒pullRquest
,之後就不會再hold
住它了我們再來看看 PullMessageProcessor#executeRequestWhenWakeup
方法:
public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException { // 關注 Runnable#run() 方法即可 Runnable run = new Runnable() { @Override public void run() { try { // 再一次呼叫 PullMessageProcessor#processRequest(...) 方法 final RemotingCommand response = PullMessageProcessor.this .processRequest(channel, request, false); ... } catch (RemotingCommandException e1) { log.error("excuteRequestWhenWakeup run", e1); } } }; // 提交任務 this.brokerController.getPullMessageExecutor() .submit(new RequestTask(run, channel, request)); }
這個方法準備了一個任務,然後將其提交到執行緒池中執行,任務內容很簡單,僅是呼叫了PullMessageProcessor#processRequest(...)
方法,這個方法就是本節一始提到的處理consumer
拉取訊息的方法了。
在分析訊息分發流程時,DefaultMessageStore.ReputMessageService#doReput
方法中有這麼一段:
private void doReput() { ... // 分發訊息 DefaultMessageStore.this.doDispatch(dispatchRequest); // 長輪詢:如果有訊息到了主節點,並且開啟了長輪詢 if (BrokerRole.SLAVE != DefaultMessageStore.this .getMessageStoreConfig().getBrokerRole() &&DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){ // 呼叫NotifyMessageArrivingListener的arriving方法 DefaultMessageStore.this.messageArrivingListener.arriving( dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } ... }
這段就是用來主動喚醒hold
住的consumer
請求的,我們進入NotifyMessageArrivingListener#arriving
方法:
@Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties); }
最終它也是呼叫了 PullRequestHoldService#notifyMessageArriving(...)
方法。
本文主要分析了broker
處理PULL_MESSAGE
請求的流程,總結如下:
broker
處理PULL_MESSAGE
的processor
為PullMessageProcessor
,PullMessageProcessor
的processRequest(...)
就是整個訊息獲取流程了broker
在獲取訊息時,先根據請求的topic
與queueId
找到consumerQueue
,然後根據請求中的offset
引數從consumerQueue
檔案中找到訊息在commitLog
的位置資訊,最後根據位置資訊從commitLog
中獲取訊息內容broker
中沒有當前consumerQueue
的訊息,broker
會掛起當前執行緒,直到超時(預設30s)或收到新的訊息時再喚醒參考
以上就是RocketMQ broker 訊息投遞流程處理PULL_MESSAGE請求解析的詳細內容,更多關於RocketMQ broker 訊息投遞的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45