<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
前邊兩章說了點基礎的,從這章開始,我們挖挖原始碼。看看RocketMQ是怎麼工作的。
首先呢,這個生產者就是送孩子去碼頭的家長,孩子們呢,就是訊息了。
我們看看訊息孩子們都長啥樣。
public class Message implements Serializable { private static final long serialVersionUID = 8445773977080406428L; //主題名字 private String topic; //訊息擴充套件資訊,Tag,keys,延遲級別都存在這裡 private Map<String, String> properties; //訊息體,位元組陣列 private byte[] body; //設定訊息的key, public void setKeys(String keys) {} //設定topic public void setTopic(String topic) {} //延遲級別 public int setDelayTimeLevel(int level) {} //訊息過濾的標記 public void setTags(String tags) {} //擴充套件資訊存放在此 public void putUserProperty(final String name, final String value) {} }
訊息就是孩子們,這些孩子們呢,有各自的特點,也有共性。同一個家長送來的兩個孩子可以是去同一個地方的,也可以是去不同的地方的。
首先呢,每個孩子訊息都有一個屬性topic,這個我們上文說到了,是一個候船大廳。孩子們進來之後,走到自己指定的候船大廳的指定區域(平時出門坐火車高鐵不也是指定的站臺乘車麼),坐到message queue座位上等,等著出行。
Broker有一個或者多個topic,訊息會存放到topic內的message queue內,等待被消費。
孩子訊息,也有一個Body屬性,這就是他的能力,他會畫畫,他會唱歌,他會幹啥幹啥,就記錄在這個Body屬性裡。等走出去了,體現價值的地方也是這個Body屬性。
Body就是訊息體,消費者會根據訊息體執行對應的操作。
這個tag我們上節說了,就是一個標記,有的孩子揹著畫板,相機,有的遊船就特意找到這些孩子拉走,完成他們的任務。
可以給訊息設定tag屬性,消費者可以選擇含有特定tag屬性的訊息進行消費。
key就是每個孩子訊息的名字了。要找哪個孩子,喊他名就行。
對傳送的訊息設定好 Key,以後可以根據這個Key 來查詢訊息。比如訊息異常,訊息丟失,進行查詢會很方便。
當然,還有的孩子來就不急著走,來之前就想好了,要恰個飯,得30分鐘,所以自己來了會等30分鐘後被接走。
設定延遲級別可以規定多久後訊息可以被消費。
每個送孩子來的家長都希望能送到候船大廳裡,更不希望孩子被搞丟了,這個時候這個候船大廳就需要一些保證機制了。
就是說家長送來了,孩子進到候船大廳之後,沒能成功坐到message queue座位上,這個時候工作人員會安排重試,再去看是否有座位坐。重試次數預設是2次,也就是說,訊息孩子共有3次找座位坐的機會。
看原始碼,我特意加了註解,大致可以看懂一些了。
//這裡取到了重試的次數 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); //獲取訊息佇列 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } //傳送訊息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); ... } catch (RemotingException e) { ... continue; } catch (MQClientException e) { ... continue; } catch (MQBrokerException e) { ... continue; } catch (InterruptedException e) { //可以看到只有InterruptedException丟擲了異常,其他的exception都會繼續重試 throw e; } } else { break; } }
重試程式碼如上,這個sendDefaultImpl
方法中,會嘗試傳送三次訊息,若是都失敗,才會丟擲對應的錯誤。
若是有多個Broker候車大廳的時候,服務人員會安排訊息孩子選擇一個相對不擁擠,比較容易進入的來進入。當然那些已經關閉的,停電的,沒有服務能力的,我們是不會進的。
MQ Client會維護一個Broker的傳送延遲資訊,根據這個資訊會選擇一個相對延遲較低的Broker來傳送訊息。會主動剔除哪些已經宕機,不可用或傳送延遲級別較高的Broker.
選擇Broker
就是在選擇message queue
,對應的程式碼如下:
這裡會先判斷延遲容錯開關是否開啟,這個開關預設是關閉的,若是開啟的話,會優先選擇延遲較低的Broker。
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //判斷傳送延遲容錯開關是否開啟 if (this.sendLatencyFaultEnable) { try { //選擇一個延遲上可以接受,並且和上次傳送相同的Broker int index = tpInfo.getSendWhichQueue().incrementAndGet(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //若是Broker的延遲時間可以接受,則返回這個Broker if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } //若是第一步沒能選中一個Broker,就選擇一個延遲較低的Broker final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } //若是前邊都沒選中一個Broker,就隨機選一個Broker return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }
但是當延遲容錯開關為關閉狀態的時候,執行的程式碼如下:
為了均勻分散Broker的壓力,會選擇與之前不同的Broker。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { //若是沒有上次的Brokername做參考,就隨機選一個 if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //如果有,那麼就選一個其他的Broker for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); //這裡判斷遇上一個使用的Broker不是同一個 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } //若是上邊的都沒選中,那麼就隨機選一個 return selectOneMessageQueue(); } }
Broker候船大廳為了能確切的接收到訊息孩子,至少會有兩個廳,一個主廳一個副廳,一般來說孩子都會進入到主廳,然後一頓操作,卡該忙信那機資(影分身之術),然後讓分身進入到副廳,這樣當主廳停電了,不工作了,副廳的分身只要去完成了任務就ok的。一般來說都是主廳的訊息孩子去坐船完成任務。
之後我們會聊到Broker的主從複製,分為同步複製和非同步複製,同步複製時指當master 收到訊息之後,同步到slaver才算訊息傳送成功。非同步複製是隻要master收到訊息就算成功。生產中建議至少部署兩臺master和兩臺slaver。
下一篇,我們聊聊,訊息的傳送流程,就是說,一個訊息孩子,從進碼頭的門到坐到message queue座位上,都經歷了啥。
以上就是java開發RocketMQ生產者高可用範例詳解的詳細內容,更多關於java RocketMQ生產者高可用的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45