首頁 > 軟體

RocketMQ設計之主從複製和讀寫分離

2022-03-21 13:02:00

一、主從複製

RocketMQ為了提高消費的高可用性,避免Broker發生單點故障引起Broker上的訊息無法及時消費,同時避免單個機器上硬碟壞損出現消費資料丟失。

RocketMQ採用Broker資料主從複製機制,當訊息傳送到Master伺服器後會將訊息同步到Slave伺服器,如果Master伺服器宕機,訊息消費者還可以繼續從Slave拉取訊息。

訊息從Master伺服器複製到Slave伺服器上,有兩種複製方式:同步複製SYNC_MASTER和非同步複製ASYNC_MASTER

通過組態檔conf/broker.conf檔案設定:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

對brokerRole引數進行設定:

同步複製:Master和Slave都寫成功後才返回使用者端寫成功的狀態。

  • 優點:Master伺服器出現故障,Slave伺服器上有全部資料的備份,很容易恢復到Master伺服器。
  • 缺點:由於多了一個同步等待的步驟,增加資料寫入延遲,降低系統吞吐量。

非同步複製:僅Master伺服器寫成功即可返回給使用者端寫成功的狀態。

  • 優點:沒有同步等待的步驟,低延遲,高吞吐。
  • 缺點:如果Master伺服器出現故障,有些資料可能未寫入Slave伺服器,未同步的資料可能丟失

實際應用中,需要結合業務場景,合理設定刷盤方式和主從複製方式。不建議使用同步刷盤方式,因為它頻繁觸發寫磁碟操作,效能下降很明顯。**通常把MasterSlave設定為非同步刷盤,同步複製,保證資料不丟失。**這樣即使一臺伺服器出故障,仍然可以保證資料不丟失。

二、讀寫分離

讀寫分離機制是高效能、高可用架構中常見的設計,例如Mysql實現讀寫分離機制,Client只能從Master伺服器寫資料,可以從Master伺服器和Slave伺服器都讀資料。

RocketMQ的Consumer在拉取訊息時,Broker會判斷Master伺服器的訊息堆積量來決定Consumer是否從Slave伺服器拉取訊息消費。預設一開始從Master伺服器拉群訊息,如果Master伺服器的訊息堆積超過實體記憶體40%,則會返回給Consumer的訊息結果並告知Consumer,下次從其他Slave伺服器上拉取訊息。

RocketMQ 有屬於自己的一套讀寫分離邏輯,會判斷主伺服器的訊息堆積量來決定消費者是否向從伺服器拉取訊息消費。

Consumer在向 Broker 傳送訊息拉取請求時,會根據篩選出來的訊息佇列,判定是從Master,還是從Slave拉取訊息,預設是Master。

Broker 接收到訊息消費者拉取請求,在獲取本地堆積的訊息量後,會計算伺服器的訊息堆積量是否大於實體記憶體的一定值,如果是,則標記下次從 Slave伺服器拉取,計算 Slave伺服器的 Broker Id,並響應給消費者。

Consumer在接收到 Broker的響應後,會把訊息佇列與建議下一次拉取節點的 Broker Id 關聯起來,並快取在記憶體中,以便下次拉取訊息時,確定從哪個節點傳送請求。

public class GetMessageResult {

    private final List<SelectMappedBufferResult> messageMapedList =
        new ArrayList<SelectMappedBufferResult>(100);
    private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100);
    private GetMessageStatus status;
    private long nextBeginOffset;
    private long minOffset;
    private long maxOffset;
    private int bufferTotalSize = 0;
    // 標識是否通過Slave拉拉取訊息
    private boolean suggestPullingFromSlave = false;
    private int msgCount4Commercial = 0;
}

// 針對訊息堆積量過大會切換到Slave進行查詢。
// maxOffsetPy 為當前最大物理偏移量,maxPhyOffsetPulling 為本次訊息拉取最大物理偏移量,他們的差即可表示訊息堆積量。
// TOTAL_PHYSICAL_MEMORY_SIZE 表示當前系統實體記憶體,accessMessageInMemoryMaxRatio 的預設值為 40,
// 以上邏輯即可算出當前訊息堆積量是否大於實體記憶體的 40%,如果大於則將 suggestPullingFromSlave 設定為 true。

long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
    * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
  • 決定消費者是否向從伺服器拉取訊息消費的值存在 GetMessageResult 類中。
  • suggestPullingFromSlave的預設值為 false,即預設消費者不會消費從伺服器,但它會在消費者傳送訊息拉取請求時,動態改變該值,Broker 接收、處理消費者拉取訊息請求。
  • 針對本MessageQueue訊息堆積量過大會切換到Slave進行查詢,maxOffsetPy 為當前最大物理偏移量,maxPhyOffsetPulling 為本次訊息拉取最大物理偏移量,他們的差即可表示訊息堆積量,當前訊息堆積量是否大於實體記憶體的 40%就會切換到Slave進行查詢。
public class PullMessageResponseHeader implements CommandCustomHeader {
    // suggestWhichBrokerId標識從哪個broker進行查詢
    private Long suggestWhichBrokerId;
    private Long nextBeginOffset;
    private Long minOffset;
    private Long maxOffset;
}


public class PullMessageProcessor implements NettyRequestProcessor {

    private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
        throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
        final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
        final PullMessageRequestHeader requestHeader =
            (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);

        response.setOpaque(request.getOpaque());

        final GetMessageResult getMessageResult =
            this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);

        if (getMessageResult != null) {
            response.setRemark(getMessageResult.getStatus().name());
            responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
            responseHeader.setMinOffset(getMessageResult.getMinOffset());
            responseHeader.setMaxOffset(getMessageResult.getMaxOffset());

            // 建議從slave消費訊息
            if (getMessageResult.isSuggestPullingFromSlave()) {
                // 從slave查詢
                responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
            } else {
                // 從master查詢
                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
            }

            switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
                case ASYNC_MASTER:
                case SYNC_MASTER:
                    break;
                case SLAVE:
                    // 針對SLAVE需要判斷是否可讀,不可讀的情況下讀MASTER
                    if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                        responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
                    }
                    break;
            }

            if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                // consume too slow ,redirect to another machine
                if (getMessageResult.isSuggestPullingFromSlave()) {
                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
                }
                // consume ok
                else {
                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
                }
            } else {
                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
            }
        }

        return response;
    }
}

PullMessageResponseHeadersuggestWhichBrokerId標識某個MessageQueue的訊息從具體的brokerId進行查詢。
針對Slave不可讀的情況會設定為從MASTER_ID進行查詢。

public class PullAPIWrapper {
    private final InternalLogger log = ClientLogger.getLog();
    private final MQClientInstance mQClientFactory;
    private final String consumerGroup;
    private final boolean unitMode;
    private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
        new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
    private volatile boolean connectBrokerByUser = false;
    private volatile long defaultBrokerId = MixAll.MASTER_ID;
    private Random random = new Random(System.currentTimeMillis());
    private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();

    public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
        final SubscriptionData subscriptionData) {
        PullResultExt pullResultExt = (PullResultExt) pullResult;

        // 處理MessageQueue對應拉取的brokerId
        this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());

        // 省略相關程式碼

        pullResultExt.setMessageBinary(null);

        return pullResult;
    }

    public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
        // 儲存在pullFromWhichNodeTable物件中
        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
        if (null == suggest) {
            this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
        } else {
            suggest.set(brokerId);
        }
    }
}

Consumer收到拉取響應回來的資料後,會將下次建議拉取的 brokerId快取起來。

public class PullAPIWrapper {
    private final InternalLogger log = ClientLogger.getLog();
    private final MQClientInstance mQClientFactory;
    private final String consumerGroup;
    private final boolean unitMode;
    private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
        new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
    private volatile boolean connectBrokerByUser = false;
    private volatile long defaultBrokerId = MixAll.MASTER_ID;
    private Random random = new Random(System.currentTimeMillis());
    private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();

    public PullResult pullKernelImpl(
        final MessageQueue mq,
        final String subExpression,
        final String expressionType,
        final long subVersion,
        final long offset,
        final int maxNums,
        final int sysFlag,
        final long commitOffset,
        final long brokerSuspendMaxTimeMillis,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

        // 查詢MessageQueue應該從brokerName的哪個節點查詢
        FindBrokerResult findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);

        if (null == findBrokerResult) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult =
                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                    this.recalculatePullFromWhichNode(mq), false);
        }

        if (findBrokerResult != null) {
            {
                // check version
                if (!ExpressionType.isTagType(expressionType)
                    && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
                        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
                }
            }
            int sysFlagInner = sysFlag;

            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
            }

            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setQueueOffset(offset);
            requestHeader.setMaxMsgNums(maxNums);
            requestHeader.setSysFlag(sysFlagInner);
            requestHeader.setCommitOffset(commitOffset);
            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(subVersion);
            requestHeader.setExpressionType(expressionType);

            String brokerAddr = findBrokerResult.getBrokerAddr();
            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
            }

            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                brokerAddr,
                requestHeader,
                timeoutMillis,
                communicationMode,
                pullCallback);

            return pullResult;
        }

        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }


    public long recalculatePullFromWhichNode(final MessageQueue mq) {
        if (this.isConnectBrokerByUser()) {
            return this.defaultBrokerId;
        }

        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
        if (suggest != null) {
            return suggest.get();
        }

        return MixAll.MASTER_ID;
    }
}

Consumer拉取訊息的時候會從 pullFromWhichNodeTable 中取出拉取 brokerId確定去具體的broker進行查詢。

到此這篇關於RocketMQ設計之主從複製和讀寫分離的文章就介紹到這了,更多相關RocketMQ從複製和讀寫分離內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com