<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
就是說,我們打了個比方,把RocketMQ比作碼頭上的一個小房子,來送孩子登船的家長比作生產者,拉走孩子們的船伕比作消費者,所以,RocketMQ的故事就這麼展開了。
這節我們研究研究,訊息的傳送流程。也就是說,訊息孩子從進門到坐到message queue座位上都經歷了啥。
父母把訊息孩子送到碼頭之後,門口的門童defaultMQProducerImpl.send()
接過孩子,進入到MQ房子內部,然後引導孩子進入Broker
候船大廳內的message queue
座位上就坐。這就是訊息傳送的流程了。
而且孩子在剛被門童接到之後,就被規定了能在候船大廳待多久,預設是3秒。也就是說,要是再小房子內等了三秒沒走,就離開吧,你怕是沒想明白自己來幹啥的。這就是訊息的超時時間。
public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return send(msg, this.defaultMQProducer.getSendMsgTimeout()); }
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); }
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}
這裡看看這幾個引數,
這裡訊息孩子進到第一個卡口,先要檢查送孩子來的家長是否還能聯絡上,若是能聯絡到,就繼續。要是聯絡不到,這孩子豈不是被拋棄了,不敢接不敢接,送到孤兒院吧。
然後需要檢查訊息孩子了,首先是檢查孩子還在不在,別扔個衣服跑了。
然後看看孩子指定的這個topic,不能說我想去內個topic哈,必須是實實在在的名字。而且上頭也規定了,這個topic的名字也不能太長,也不能包含特殊字元。已有的一些領導定過的也不能用哈。
接下來就是檢查孩子的body了,之前說body就是孩子的技能,首先,技能為空,不行不行,啥都不會是不行的。再者太長也不行,你唱首歌兩年,這沒法玩。
檢查message不為null
檢查topic
檢查話題的名字是否被系統已佔用
檢查body
下邊我們看看sendDefaultImpl這個方法。給他拆成一段一段的看。
//校驗生產者服務是ok的,可以聯絡到的 this.makeSureStateOK(); //校驗訊息的引數 Validators.checkMessage(msg, this.defaultMQProducer);
private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { throw new MQClientException("The producer service state not OK, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); } }
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { if (null == msg) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } // 這裡校驗Topic的時候,校驗了不能為空,長度和特殊字元 Validators.checkTopic(msg.getTopic()); //這裡則校驗了一些不允許使用的topic名字 Validators.isNotAllowedSendTopic(msg.getTopic()); // body不為空 if (null == msg.getBody()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); } // body長度不為0 if (0 == msg.getBody().length) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); } // body 長度不能過長 if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); } }
嗯,這裡孩子終於通過了檢查,服務人員開始帶著他去找自己指定的topic區域,指定是自己指定,劃分還是工作人員劃分的。咱總得知道這個topic區域在哪吧。
先去快取筆記裡找,有沒有這個區域的資訊,若是沒有這個topic,就新建一個,然後更新到快取筆記裡邊。若有topic但是不知道在哪,就找name server
大腦去申請這個topic在哪的資訊。
執行tryToFindTopicPublishInfo方法去獲取Topic的路由資訊,若是不存在就新建,若是有topic但是快取中沒有路由資訊,則通過name server獲取路由資訊。
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { //獲取topic資訊 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); //不存在 if (null == topicPublishInfo || !topicPublishInfo.ok()) { //新建 this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); //修改topic的路由資訊並更新到本地 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } //包含路由資訊就直接返回 if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { //不包含路由資訊則向name server申請,修改topic的路由資訊並更新到本地 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
這就是計算訊息孩子可以嘗試去找地方坐幾次,沒坐上,欸,我又來了,沒坐上,欸,我又來了。
這行程式碼就是計算重試次數的,根據communicationMode
傳入的值,同步非同步還是單向的來決定重試次數是幾次。 很明顯,若是同步的,就會嘗試三次。若是非同步的或者單向的就只傳送一次。
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
我們之前說了,Broker類似於候船大廳,為了均分壓力,每次都要進與上次不同的候船大廳。
執行selectOneMessageQueue
方法通過Queue將訊息傳送到與上次不同的一個Broker。也可以通過 sendLatencyFaultEnable判斷是否啟用延遲容錯開關
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
這就是走過巷道坐到屬於自己的座位上了
然後就通過sendKernelImpl
傳送訊息了,這是傳送訊息的核心方法。會準備通訊層的入參,並將請求傳送給通訊層,內部實現是基於Netty
的。
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
以上就是RocketMQ訊息傳送流程原始碼剖析的詳細內容,更多關於RocketMQ訊息傳送流程的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45