<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
RocketMQ為了提高消費的高可用性,避免Broker發生單點故障引起Broker上的訊息無法及時消費,同時避免單個機器上硬碟壞損出現消費資料丟失。
RocketMQ採用Broker資料主從複製機制,當訊息傳送到Master伺服器後會將訊息同步到Slave伺服器,如果Master伺服器宕機,訊息消費者還可以繼續從Slave拉取訊息。
訊息從Master伺服器複製到Slave伺服器上,有兩種複製方式:同步複製SYNC_MASTER
和非同步複製ASYNC_MASTER
。
通過組態檔conf/broker.conf檔案設定:
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH
對brokerRole引數進行設定:
同步複製:Master和Slave都寫成功後才返回使用者端寫成功的狀態。
非同步複製:僅Master伺服器寫成功即可返回給使用者端寫成功的狀態。
實際應用中,需要結合業務場景,合理設定刷盤方式和主從複製方式。不建議使用同步刷盤方式,因為它頻繁觸發寫磁碟操作,效能下降很明顯。**通常把Master
和Slave
設定為非同步刷盤,同步複製,保證資料不丟失。**這樣即使一臺伺服器出故障,仍然可以保證資料不丟失。
讀寫分離機制是高效能、高可用架構中常見的設計,例如Mysql實現讀寫分離機制,Client只能從Master伺服器寫資料,可以從Master伺服器和Slave伺服器都讀資料。
RocketMQ的Consumer
在拉取訊息時,Broker會判斷Master伺服器的訊息堆積量來決定Consumer是否從Slave伺服器拉取訊息消費。預設一開始從Master伺服器拉群訊息,如果Master伺服器的訊息堆積超過實體記憶體40%,則會返回給Consumer的訊息結果並告知Consumer,下次從其他Slave伺服器上拉取訊息。
RocketMQ 有屬於自己的一套讀寫分離邏輯,會判斷主伺服器的訊息堆積量來決定消費者是否向從伺服器拉取訊息消費。
Consumer
在向 Broker 傳送訊息拉取請求時,會根據篩選出來的訊息佇列,判定是從Master,還是從Slave拉取訊息,預設是Master。
Broker 接收到訊息消費者拉取請求,在獲取本地堆積的訊息量後,會計算伺服器的訊息堆積量是否大於實體記憶體的一定值,如果是,則標記下次從 Slave伺服器拉取,計算 Slave伺服器的 Broker Id,並響應給消費者。
Consumer在接收到 Broker的響應後,會把訊息佇列與建議下一次拉取節點的 Broker Id 關聯起來,並快取在記憶體中,以便下次拉取訊息時,確定從哪個節點傳送請求。
public class GetMessageResult { private final List<SelectMappedBufferResult> messageMapedList = new ArrayList<SelectMappedBufferResult>(100); private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100); private GetMessageStatus status; private long nextBeginOffset; private long minOffset; private long maxOffset; private int bufferTotalSize = 0; // 標識是否通過Slave拉拉取訊息 private boolean suggestPullingFromSlave = false; private int msgCount4Commercial = 0; } // 針對訊息堆積量過大會切換到Slave進行查詢。 // maxOffsetPy 為當前最大物理偏移量,maxPhyOffsetPulling 為本次訊息拉取最大物理偏移量,他們的差即可表示訊息堆積量。 // TOTAL_PHYSICAL_MEMORY_SIZE 表示當前系統實體記憶體,accessMessageInMemoryMaxRatio 的預設值為 40, // 以上邏輯即可算出當前訊息堆積量是否大於實體記憶體的 40%,如果大於則將 suggestPullingFromSlave 設定為 true。 long diff = maxOffsetPy - maxPhyOffsetPulling; long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); getResult.setSuggestPullingFromSlave(diff > memory);
GetMessageResult
類中。suggestPullingFromSlave
的預設值為 false,即預設消費者不會消費從伺服器,但它會在消費者傳送訊息拉取請求時,動態改變該值,Broker 接收、處理消費者拉取訊息請求。maxPhyOffsetPulling
為本次訊息拉取最大物理偏移量,他們的差即可表示訊息堆積量,當前訊息堆積量是否大於實體記憶體的 40%就會切換到Slave進行查詢。public class PullMessageResponseHeader implements CommandCustomHeader { // suggestWhichBrokerId標識從哪個broker進行查詢 private Long suggestWhichBrokerId; private Long nextBeginOffset; private Long minOffset; private Long maxOffset; } public class PullMessageProcessor implements NettyRequestProcessor { private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException { RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); response.setOpaque(request.getOpaque()); final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter); if (getMessageResult != null) { response.setRemark(getMessageResult.getStatus().name()); responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset()); responseHeader.setMinOffset(getMessageResult.getMinOffset()); responseHeader.setMaxOffset(getMessageResult.getMaxOffset()); // 建議從slave消費訊息 if (getMessageResult.isSuggestPullingFromSlave()) { // 從slave查詢 responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } else { // 從master查詢 responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) { case ASYNC_MASTER: case SYNC_MASTER: break; case SLAVE: // 針對SLAVE需要判斷是否可讀,不可讀的情況下讀MASTER if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) { response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } break; } if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) { // consume too slow ,redirect to another machine if (getMessageResult.isSuggestPullingFromSlave()) { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } // consume ok else { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); } } else { responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } } return response; } }
PullMessageResponseHeader
的suggestWhichBrokerId
標識某個MessageQueue
的訊息從具體的brokerId進行查詢。
針對Slave不可讀的情況會設定為從MASTER_ID進行查詢。
public class PullAPIWrapper { private final InternalLogger log = ClientLogger.getLog(); private final MQClientInstance mQClientFactory; private final String consumerGroup; private final boolean unitMode; private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable = new ConcurrentHashMap<MessageQueue, AtomicLong>(32); private volatile boolean connectBrokerByUser = false; private volatile long defaultBrokerId = MixAll.MASTER_ID; private Random random = new Random(System.currentTimeMillis()); private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, final SubscriptionData subscriptionData) { PullResultExt pullResultExt = (PullResultExt) pullResult; // 處理MessageQueue對應拉取的brokerId this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId()); // 省略相關程式碼 pullResultExt.setMessageBinary(null); return pullResult; } public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) { // 儲存在pullFromWhichNodeTable物件中 AtomicLong suggest = this.pullFromWhichNodeTable.get(mq); if (null == suggest) { this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId)); } else { suggest.set(brokerId); } } }
Consumer
收到拉取響應回來的資料後,會將下次建議拉取的 brokerId
快取起來。
public class PullAPIWrapper { private final InternalLogger log = ClientLogger.getLog(); private final MQClientInstance mQClientFactory; private final String consumerGroup; private final boolean unitMode; private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable = new ConcurrentHashMap<MessageQueue, AtomicLong>(32); private volatile boolean connectBrokerByUser = false; private volatile long defaultBrokerId = MixAll.MASTER_ID; private Random random = new Random(System.currentTimeMillis()); private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); public PullResult pullKernelImpl( final MessageQueue mq, final String subExpression, final String expressionType, final long subVersion, final long offset, final int maxNums, final int sysFlag, final long commitOffset, final long brokerSuspendMaxTimeMillis, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 查詢MessageQueue應該從brokerName的哪個節點查詢 FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); } if (findBrokerResult != null) { { // check version if (!ExpressionType.isTagType(expressionType) && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) { throw new MQClientException("The broker[" + mq.getBrokerName() + ", " + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null); } } int sysFlagInner = sysFlag; if (findBrokerResult.isSlave()) { sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); } PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType); String brokerAddr = findBrokerResult.getBrokerAddr(); if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); } PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback); return pullResult; } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); } public long recalculatePullFromWhichNode(final MessageQueue mq) { if (this.isConnectBrokerByUser()) { return this.defaultBrokerId; } AtomicLong suggest = this.pullFromWhichNodeTable.get(mq); if (suggest != null) { return suggest.get(); } return MixAll.MASTER_ID; } }
Consumer
拉取訊息的時候會從 pullFromWhichNodeTable
中取出拉取 brokerId確定去具體的broker進行查詢。
到此這篇關於RocketMQ設計之主從複製和讀寫分離的文章就介紹到這了,更多相關RocketMQ從複製和讀寫分離內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45