首頁 > 軟體

Nacos使用者端設定中心快取動態更新實現原始碼

2022-03-31 13:00:03

Nacos 作為設定中心,當應用程式去存取Nacos動態獲取設定源之後,會快取到本地記憶體以及磁碟中。
由於Nacos作為動態設定中心,意味著後續設定變更之後需要讓所有相關的使用者端感知,並更新本地記憶體!

那麼這個功能是在哪裡實現的呢? 以及它是採用什麼樣的方式來實現設定的更新的呢? 我們一起來探索一下原始碼的實現!

使用者端設定快取更新

當用戶端拿到設定後,需要動態重新整理,從而保證資料和伺服器端是一致的,這個過程是如何實現的呢?在這一小節中我們來做一個詳細分析。

Nacos採用長輪訓機制來實現資料變更的同步,原理如下!

整體工作流程如下:

  • 使用者端發起長輪訓請求
  • 伺服器端收到請求以後,先比較伺服器端快取中的資料是否相同,如果不通,則直接返回
  • 如果相同,則通過schedule延遲29.5s之後再執行比較
  • 為了保證當伺服器端在29.5s之內發生資料變化能夠及時通知給使用者端,伺服器端採用事件訂閱的方式來監聽伺服器端本地資料變化的事件,一旦收到事件,則觸發DataChangeTask的通知,並且遍歷allStubs佇列中的ClientLongPolling,把結果寫回到使用者端,就完成了一次資料的推播
  • 如果 DataChangeTask 任務完成了資料的 “推播” 之後,ClientLongPolling 中的排程任務又開始執行了怎麼辦呢?
    很簡單,只要在進行 “推播” 操作之前,先將原來等待執行的排程任務取消掉就可以了,這樣就防止了推播操作寫完響應資料之後,排程任務又去寫響應資料,這時肯定會報錯的。所以,在ClientLongPolling方法中,最開始的一個步驟就是刪除訂閱事件

長輪訓任務啟動入口

在NacosConfigService的構造方法中,當這個類被範例化以後,有做一些事情

  • 初始化一個HttpAgent,這裡又用到了裝飾起模式,實際工作的類是ServerHttpAgent, MetricsHttpAgent內部也是呼叫了ServerHttpAgent的方法,增加了監控統計的資訊
  • ClientWorker, 使用者端的一個工作類,agent作為引數傳入到clientworker,可以基本猜測到裡面會用到agent做一些遠端相關的事情
public NacosConfigService(Properties properties) throws NacosException {
    ValidatorUtils.checkInitParam(properties);
    String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
    if (StringUtils.isBlank(encodeTmp)) {
        this.encode = Constants.ENCODE;
    } else {
        this.encode = encodeTmp.trim();
    }
    initNamespace(properties); //
    this.configFilterChainManager = new ConfigFilterChainManager(properties);
    //初始化網路通訊元件
    this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
    this.agent.start(); 
    //初始化ClientWorker
    this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}

ClientWorker

在上述初始化程式碼中,我們重點需要關注ClientWorker這個類,它的構造方法如下

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
        final Properties properties) {
    this.agent = agent;
    this.configFilterChainManager = configFilterChainManager; //初始化設定過濾管理器
    // Initialize the timeout parameter
    init(properties); //初始化設定
    //初始化一個定時排程的執行緒池,重寫了threadfactory方法
    this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });
     //初始化一個定時排程的執行緒池,從裡面的name名字來看,似乎和長輪訓有關係。而這個長輪訓應該是和nacos伺服器端的長輪訓
    this.executorService = Executors
            .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                    t.setDaemon(true);
                    return t;
                }
            });
    //設定定時任務的執行頻率,並且呼叫checkConfigInfo這個方法,猜測是定時去檢測設定是否發生了變化
        //首次執行延遲時間為1毫秒、延遲時間為10毫秒
    this.executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
            }
        }
    }, 1L, 10L, TimeUnit.MILLISECONDS);
}

可以看到 ClientWorker 除了將 HttpAgent 維持在自己內部,還建立了兩個執行緒池:

第一個執行緒池是隻擁有一個執行緒用來執行定時任務的 executor,executor 每隔 10ms 就會執行一次 checkConfigInfo() 方法,從方法名上可以知道是每 10 ms 檢查一次設定資訊。

第二個執行緒池是一個普通的執行緒池,從 ThreadFactory 的名稱可以看到這個執行緒池是做長輪詢的。

checkConfigInfo

ClientWorker構造初始化中,啟動了一個定時任務去執行checkConfigInfo()方法,這個方法主要是定時檢查本地設定和伺服器上的設定的變更情況,這個方法定義如下.

public void checkConfigInfo() {
    // Dispatch tasks.
    int listenerSize = cacheMap.size(); //
    // Round up the longingTaskCount.
     // 向上取整為批數,監聽的設定數量除以3000,得到一個整數,代表長輪訓任務的數量
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
     //currentLongingTaskCount表示當前的長輪訓任務數量,如果小於計算的結果,則可以繼續建立
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // The task list is no order.So it maybe has issues when changing.
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}

這個方法主要的目的是用來檢查伺服器端的設定資訊是否發生了變化。如果有變化,則觸發listener通知

cacheMap: AtomicReference<Map<String, CacheData>> cacheMap 用來儲存監聽變更的快取集合。key是根據dataID/group/tenant(租戶) 拼接的值。Value是對應儲存在nacos伺服器上的組態檔的內容。

預設情況下,每個長輪訓LongPullingRunnable任務預設處理3000個監聽設定集。如果超過3000, 則需要啟動多個LongPollingRunnable去執行。

currentLongingTaskCount儲存已啟動的LongPullingRunnable任務數

executorService就是在ClientWorker構造方法中初始化的執行緒池

LongPollingRunnable.run

LongPollingRunnable長輪訓任務的實現邏輯,程式碼比較長,我們分段來分析。

第一部分主要有兩個邏輯

  • 對任務按照批次分類
  • 檢查當前批次的快取和本地檔案的資料是否一致,如果發生了變化,則觸發監聽。
class LongPollingRunnable implements Runnable {
    private final int taskId; //表示當前任務批次id
    public LongPollingRunnable(int taskId) {
        this.taskId = taskId;
    }
    @Override
    public void run() {
        List<CacheData> cacheDatas = new ArrayList<CacheData>();
        List<String> inInitializingCacheList = new ArrayList<String>();
        try {
            // 遍歷CacheMap,把CacheMap中和當前任務id相同的快取,儲存到cacheDatas
            // 通過checkLocalConfig方法
            for (CacheData cacheData : cacheMap.values()) {
                if (cacheData.getTaskId() == taskId) {
                    cacheDatas.add(cacheData);
                    try {
                        checkLocalConfig(cacheData);
                        if (cacheData.isUseLocalConfigInfo()) { //這裡表示資料有變化,需要通知監聽器
                            cacheData.checkListenerMd5(); //通知所有針對當前設定設定了監聽的監聽器
                        }
                    } catch (Exception e) {
                        LOGGER.error("get local config info error", e);
                    }
                }
            }
           //省略部分
        } catch (Throwable e) {
            // If the rotation training task is abnormal, the next execution time of the task will be punished
            LOGGER.error("longPolling error : ", e);
            executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); //出現異常,到下一次taskPenaltyTime後重新執行任務
        }
    }
}

checkLocalConfig

檢查本地設定,這裡面有三種情況

  • 如果isUseLocalConfigInfo為false,表示不使用本地設定,但是本地快取路徑的檔案是存在的,於是把isUseLocalConfigInfo設定為true,並且更新cacheData的內容以及檔案的更新時間
  • 如果isUseLocalConfigInfo為true,表示使用本地組態檔,但是本地快取檔案不存在,則設定為false,不通知監聽器。
  • 如果isUseLocalConfigInfo為true,並且本地快取檔案也存在,但是快取的的時間和檔案的更新時間不一致,則更新cacheData中的內容,並且isUseLocalConfigInfo設定為true。
private void checkLocalConfig(CacheData cacheData) {
    final String dataId = cacheData.dataId;
    final String group = cacheData.group;
    final String tenant = cacheData.tenant;
    File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);
    // 沒有 -> 有
    if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
        String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
        final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
        cacheData.setUseLocalConfigInfo(true);
        cacheData.setLocalConfigInfoVersion(path.lastModified());
        cacheData.setContent(content);
        String encryptedDataKey = LocalEncryptedDataKeyProcessor
                .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
        cacheData.setEncryptedDataKey(encryptedDataKey);
        
        LOGGER.warn(
                "[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",
                agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
        return;
    }
     // 有 -> 沒有。不通知業務監聽器,從server拿到設定後通知。
    // If use local config info, then it doesn't notify business listener and notify after getting from server.
    if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
        cacheData.setUseLocalConfigInfo(false);
        LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(),
                dataId, group, tenant);
        return;
    }
     // 有變更
    if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path
            .lastModified()) {
        String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
        final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
        cacheData.setUseLocalConfigInfo(true);
        cacheData.setLocalConfigInfoVersion(path.lastModified());
        cacheData.setContent(content);
        String encryptedDataKey = LocalEncryptedDataKeyProcessor
                .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
        cacheData.setEncryptedDataKey(encryptedDataKey);
        LOGGER.warn(
                "[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",
                agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
    }
}

checkListenerMd5

遍歷使用者自己新增的監聽器,如果發現資料的md5值不同,則傳送通知

void checkListenerMd5() {
    for (ManagerListenerWrap wrap : listeners) {
        if (!md5.equals(wrap.lastCallMd5)) {
            safeNotifyListener(dataId, group, content, type, md5, wrap);
        }
    }
}

檢查伺服器端設定

在LongPollingRunnable.run中,先通過本地設定的讀取和檢查來判斷資料是否發生變化從而實現變化的通知

接著,當前的執行緒還需要去遠端伺服器上獲得最新的資料,檢查哪些資料發生了變化

  • 通過checkUpdateDataIds獲取遠端伺服器上資料變更的dataid
  • 遍歷這些變化的集合,然後呼叫getServerConfig從遠端伺服器獲得對應的內容
  • 更新原生的cache,設定為伺服器端返回的內容
  • 最後遍歷cacheDatas,找到變化的資料進行通知
// check server config
//從伺服器端獲取發生變化的資料的DataID列表,儲存在List<String>集合中
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
if (!CollectionUtils.isEmpty(changedGroupKeys)) {
    LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
}
//遍歷發生了變更的設定項
for (String groupKey : changedGroupKeys) {
    String[] key = GroupKey.parseKey(groupKey);
    String dataId = key[0];
    String group = key[1];
    String tenant = null;
    if (key.length == 3) {
        tenant = key[2];
    }
    try {
        //逐項根據這些設定項獲取設定資訊
        ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);
        //把設定資訊儲存到CacheData中
        CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
        cache.setContent(response.getContent());
        cache.setEncryptedDataKey(response.getEncryptedDataKey());
        if (null != response.getConfigType()) {
            cache.setType(response.getConfigType());
        }
        LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                    agent.getName(), dataId, group, tenant, cache.getMd5(),
                    ContentUtils.truncateContent(response.getContent()), response.getConfigType());
    } catch (NacosException ioe) {
        String message = String
            .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                    agent.getName(), dataId, group, tenant);
        LOGGER.error(message, ioe);
    }
}
//再遍歷CacheData這個集合,找到發生變化的資料進行通知
for (CacheData cacheData : cacheDatas) {
    if (!cacheData.isInitializing() || inInitializingCacheList
        .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
        cacheData.checkListenerMd5();
        cacheData.setInitializing(false);
    }
}
inInitializingCacheList.clear();
 //繼續傳遞當前執行緒進行輪詢
executorService.execute(this);

checkUpdateDataIds

這個方法主要是向伺服器端發起檢查請求,判斷自己原生的設定和伺服器端的設定是否一致。

  • 首先從cacheDatas集合中找到isUseLocalConfigInfo為false的快取
  • 把需要檢查的設定項,拼接成一個字串,呼叫checkUpdateConfigStr進行驗證
/**
 * 從Server獲取值變化了的DataID列表。返回的物件裡只有dataId和group是有效的。 保證不返回NULL。
 */
List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException {
    StringBuilder sb = new StringBuilder();
    for (CacheData cacheData : cacheDatas) { //把需要檢查的設定項,拼接成一個字串
        if (!cacheData.isUseLocalConfigInfo()) { //找到isUseLocalConfigInfo=false的快取
            sb.append(cacheData.dataId).append(WORD_SEPARATOR);
            sb.append(cacheData.group).append(WORD_SEPARATOR);
            if (StringUtils.isBlank(cacheData.tenant)) {
                sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
            } else {
                sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
                sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
            }
            if (cacheData.isInitializing()) {//
                // cacheData 首次出現在cacheMap中&首次check更新
                inInitializingCacheList
                    .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
            }
        }
    }
    boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
    return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}

checkUpdateConfigStr

從Server獲取值變化了的DataID列表。返回的物件裡只有dataId和group是有效的。 保證不返回NULL。

List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {
    //拼接引數和header
    Map<String, String> params = new HashMap<String, String>(2);
    params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
    Map<String, String> headers = new HashMap<String, String>(2);
    headers.put("Long-Pulling-Timeout", "" + timeout);
    // told server do not hang me up if new initializing cacheData added in
    if (isInitializingCacheList) {
        headers.put("Long-Pulling-Timeout-No-Hangup", "true");
    }
    if (StringUtils.isBlank(probeUpdateString)) {//判斷可能發生變更的字串是否為空,如果是,則直接返回。
        return Collections.emptyList();
    }
    try {
        // In order to prevent the server from handling the delay of the client's long task,
        // increase the client's read timeout to avoid this problem.
        // 設定readTimeoutMs,也就是本次請求等待響應的超時時間,預設是30s
        long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
        //發起遠端呼叫
        HttpRestResult<String> result = agent
                .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),
                        readTimeoutMs);
        if (result.ok()) { //如果響應成功
            setHealthServer(true);
            return parseUpdateDataIdResponse(result.getData()); //解析並更新資料,返回的是確實發生了資料變更的字串:tenant/group/dataid。
        } else {//如果響應失敗
            setHealthServer(false);
            LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(),
                    result.getCode());
        }
    } catch (Exception e) {
        setHealthServer(false);
        LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
        throw e;
    }
    return Collections.emptyList();
}

使用者端快取設定長輪訓機制總結

整體實現的核心點就一下幾個部分

對本地快取的設定做任務拆分,每一個批次是3000條

針對每3000條建立一個執行緒去執行

先把每一個批次的快取和本地磁碟檔案中的資料進行比較,

  • 如果和本地設定不一致,則表示該快取發生了更新,直接通知使用者端監聽
  • 如果本地快取和磁碟資料一致,則需要發起遠端請求檢查設定變化

先以tenent/groupId/dataId拼接成字串,傳送到伺服器端進行檢查,返回發生了變更的設定

使用者端收到變更設定列表,再逐項遍歷傳送到伺服器端獲取設定內容。

伺服器端設定更新的推播

分析完使用者端之後,隨著好奇心的驅使,伺服器端是如何處理使用者端的請求的?那麼同樣,我們需要思考幾個問題

  • 伺服器端是如何實現長輪訓機制的
  • 使用者端的超時時間為什麼要設定30s

使用者端發起的請求地址是:/v1/cs/configs/listener,於是找到這個介面進行檢視,程式碼如下。

//# ConfigController.java
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
    request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
    String probeModify = request.getParameter("Listening-Configs");
    if (StringUtils.isBlank(probeModify)) {
        throw new IllegalArgumentException("invalid probeModify");
    }
    probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
    Map<String, String> clientMd5Map;
    try {
        //解析使用者端傳遞過來的可能發生變化的設定專案,轉化為Map集合(key=dataId,value=md5)
        clientMd5Map = MD5Util.getClientMd5Map(probeModify);
    } catch (Throwable e) {
        throw new IllegalArgumentException("invalid probeModify");
    }
    // 開始執行長輪訓。
    inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}

doPollingConfig

這個方法主要是用來做長輪訓和短輪詢的判斷

  • 如果是長輪訓,直接走addLongPollingClient方法
  • 如果是短輪詢,直接比較伺服器端的資料,如果存在md5不一致,直接把資料返回。
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
        Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
    // 判斷當前請求是否支援長輪訓。()
    if (LongPollingService.isSupportLongPolling(request)) {
        longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
        return HttpServletResponse.SC_OK + "";
    }
    //如果是短輪詢,走下面的請求,下面的請求就是把使用者端傳過來的資料和伺服器端的資料逐項進行比較,儲存到changeGroups中。
    // Compatible with short polling logic.
    List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
    // Compatible with short polling result.
    String oldResult = MD5Util.compareMd5OldResult(changedGroups);
    String newResult = MD5Util.compareMd5ResultString(changedGroups);
    
    String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
    if (version == null) {
        version = "2.0.0";
    }
    int versionNum = Protocol.getVersionNumber(version);
    // Before 2.0.4 version, return value is put into header.
    if (versionNum < START_LONG_POLLING_VERSION_NUM) {
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
    } else {
        request.setAttribute("content", newResult);
    }
    Loggers.AUTH.info("new content:" + newResult);
    // Disable cache.
    response.setHeader("Pragma", "no-cache");
    response.setDateHeader("Expires", 0);
    response.setHeader("Cache-Control", "no-cache,no-store");
    response.setStatus(HttpServletResponse.SC_OK);
    return HttpServletResponse.SC_OK + "";
}

addLongPollingClient

把使用者端的請求,儲存到長輪訓的執行引擎中。

public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
        int probeRequestSize) {
    //獲取使用者端長輪訓的超時時間
    String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); 
    //不允許斷開的標記
    String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
    //應用名稱
    String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
    //
    String tag = req.getHeader("Vipserver-Tag");
    //延期時間,預設為500ms
    int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);

    // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
    // 提前500ms返回一個響應,避免使用者端出現超時
    long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
    if (isFixedPolling()) {
        timeout = Math.max(10000, getFixedPollingInterval());
        // Do nothing but set fix polling timeout.
    } else {
        long start = System.currentTimeMillis();
        //通過md5判斷使用者端請求過來的key是否有和伺服器端有不一致的,如果有,則儲存到changedGroups中。
        List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
        if (changedGroups.size() > 0) { //如果發現有變更,則直接把請求返回給使用者端
            generateResponse(req, rsp, changedGroups);
            LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
            return;
        } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { //如果noHangUpFlag為true,說明不需要掛起使用者端,所以直接返回。
            LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
            return;
        }
    }
    //獲取請求端的ip
    String ip = RequestUtil.getRemoteIp(req);
    // Must be called by http thread, or send response.
    //把當前請求轉化為一個非同步請求(意味著此時tomcat執行緒被釋放,也就是使用者端的請求,需要通過asyncContext來手動觸發返回,否則一直掛起)
    final AsyncContext asyncContext = req.startAsync();
    // AsyncContext.setTimeout() is incorrect, Control by oneself
    asyncContext.setTimeout(0L); //設定非同步請求超時時間,
    //執行長輪訓請求
    ConfigExecutor.executeLongPolling(
            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}

ClientLongPolling

接下來我們來分析一下,clientLongPolling到底做了什麼操作。或者說我們可以先猜測一下應該會做什麼事情

  • 這個任務要阻塞29.5s才能執行,因為立馬執行沒有任何意義,畢竟前面已經執行過一次了
  • 如果在29.5s+之內,資料發生變化,需要提前通知。需要有一種監控機制

基於這些猜想,我們可以看看它的實現過程

從程式碼粗粒度來看,它的實現似乎和我們的猜想一致,在run方法中,通過scheduler.schedule實現了一個定時任務,它的delay時間正好是前面計算的29.5s。在這個任務中,會通過MD5Util.compareMd5來進行計算

那另外一個,當資料發生變化以後,肯定不能等到29.5s之後才通知呀,那怎麼辦呢?我們發現有一個allSubs的東西,它似乎和釋出訂閱有關係。那是不是有可能當前的clientLongPolling訂閱了資料變化的事件呢?

class ClientLongPolling implements Runnable {
    @Override
    public void run() {
        //構建一個非同步任務,延後29.5s執行
        asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
            @Override
            public void run() { //如果達到29.5s,說明這個期間沒有做任何設定修改,則自動觸發執行
                try {
                    getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
                    // Delete subsciber's relations.
                    allSubs.remove(ClientLongPolling.this); //移除訂閱關係
                    if (isFixedPolling()) { //如果是固定間隔的長輪訓
                        LogUtil.CLIENT_LOG
                                .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix",
                                        RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                                        "polling", clientMd5Map.size(), probeRequestSize);
                        //比較變更的key
                        List<String> changedGroups = MD5Util
                                .compareMd5((HttpServletRequest) asyncContext.getRequest(),
                                        (HttpServletResponse) asyncContext.getResponse(), clientMd5Map);
                        if (changedGroups.size() > 0) {//如果大於0,表示有變更,直接響應
                            sendResponse(changedGroups);
                        } else {
                            sendResponse(null); //否則返回null
                        }
                    } else {
                        LogUtil.CLIENT_LOG
                                .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",
                                        RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                                        "polling", clientMd5Map.size(), probeRequestSize);
                        sendResponse(null);
                    }
                } catch (Throwable t) {
                    LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
                }
            }
        }, timeoutTime, TimeUnit.MILLISECONDS);
        allSubs.add(this);  //把當前執行緒新增到訂閱事件佇列中
    }
}

allSubs

allSubs是一個佇列,佇列裡面放了ClientLongPolling這個物件。這個佇列似乎和設定變更有某種關聯關係。

那麼這裡必須要實現的是,當用戶在nacos 控制檯修改了設定之後,必須要從這個訂閱關係中取出關注的使用者端長連線,然後把變更的結果返回。於是我們去看LongPollingService的構造方法查詢訂閱關係

/**
 * 長輪詢訂閱關係
 */
final Queue<ClientLongPolling> allSubs;
allSubs.add(this);

LongPollingService

在LongPollingService的構造方法中,使用了一個NotifyCenter訂閱了一個事件,其中不難發現,如果這個事件的範例是LocalDataChangeEvent,也就是伺服器端資料發生變更的時間,就會執行一個DataChangeTask的執行緒。

public LongPollingService() {
    allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();
    ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
    // Register LocalDataChangeEvent to NotifyCenter.
    NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
    //註冊LocalDataChangeEvent訂閱事件
    NotifyCenter.registerSubscriber(new Subscriber() {
        @Override
        public void onEvent(Event event) {
            if (isFixedPolling()) {
                // Ignore.
            } else {
                if (event instanceof LocalDataChangeEvent) { //如果觸發了LocalDataChangeEvent,則執行下面的程式碼
                    LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
                    ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
                }
            }
        }
        @Override
        public Class<? extends Event> subscribeType() {
            return LocalDataChangeEvent.class;
        }
    });
}

DataChangeTask

資料變更事件執行緒,程式碼如下

class DataChangeTask implements Runnable {
    @Override
    public void run() {
        try {
            ConfigCacheService.getContentBetaMd5(groupKey); //
            //遍歷所有訂閱事件表
            for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
                ClientLongPolling clientSub = iter.next(); //得到ClientLongPolling
                //判斷當前的ClientLongPolling中,請求的key是否包含當前修改的groupKey
                if (clientSub.clientMd5Map.containsKey(groupKey)) {
                    // If published tag is not in the beta list, then it skipped.
                    if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) { //如果是beta方式且betaIps不包含當前使用者端ip,直接返回
                        continue;
                    }
                    // If published tag is not in the tag list, then it skipped.
                    if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {//如果設定了tag標籤且不包含當前使用者端的tag,直接返回
                        continue;
                    }
					//
                    getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                    iter.remove(); // Delete subscribers' relationships. 移除當前使用者端的訂閱關係
                    LogUtil.CLIENT_LOG
                            .info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",
                                    RequestUtil
                                            .getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),
                                    "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
                    clientSub.sendResponse(Arrays.asList(groupKey)); //響應使用者端請求。
                }
            }
        } catch (Throwable t) {
            LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));
        }
    }
}

原理總結

以上就是Nacos使用者端設定中心快取動態更新實現原始碼的詳細內容,更多關於Nacos使用者端設定中心快取動態更新的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com