<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
Nacos 作為設定中心,當應用程式去存取Nacos動態獲取設定源之後,會快取到本地記憶體以及磁碟中。
由於Nacos作為動態設定中心,意味著後續設定變更之後需要讓所有相關的使用者端感知,並更新本地記憶體!
那麼這個功能是在哪裡實現的呢? 以及它是採用什麼樣的方式來實現設定的更新的呢? 我們一起來探索一下原始碼的實現!
當用戶端拿到設定後,需要動態重新整理,從而保證資料和伺服器端是一致的,這個過程是如何實現的呢?在這一小節中我們來做一個詳細分析。
Nacos採用長輪訓機制來實現資料變更的同步,原理如下!
整體工作流程如下:
在NacosConfigService的構造方法中,當這個類被範例化以後,有做一些事情
public NacosConfigService(Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); if (StringUtils.isBlank(encodeTmp)) { this.encode = Constants.ENCODE; } else { this.encode = encodeTmp.trim(); } initNamespace(properties); // this.configFilterChainManager = new ConfigFilterChainManager(properties); //初始化網路通訊元件 this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); this.agent.start(); //初始化ClientWorker this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); }
在上述初始化程式碼中,我們重點需要關注ClientWorker這個類,它的構造方法如下
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; //初始化設定過濾管理器 // Initialize the timeout parameter init(properties); //初始化設定 //初始化一個定時排程的執行緒池,重寫了threadfactory方法 this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); //初始化一個定時排程的執行緒池,從裡面的name名字來看,似乎和長輪訓有關係。而這個長輪訓應該是和nacos伺服器端的長輪訓 this.executorService = Executors .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); //設定定時任務的執行頻率,並且呼叫checkConfigInfo這個方法,猜測是定時去檢測設定是否發生了變化 //首次執行延遲時間為1毫秒、延遲時間為10毫秒 this.executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); }
可以看到 ClientWorker 除了將 HttpAgent 維持在自己內部,還建立了兩個執行緒池:
第一個執行緒池是隻擁有一個執行緒用來執行定時任務的 executor,executor 每隔 10ms 就會執行一次 checkConfigInfo() 方法,從方法名上可以知道是每 10 ms 檢查一次設定資訊。
第二個執行緒池是一個普通的執行緒池,從 ThreadFactory 的名稱可以看到這個執行緒池是做長輪詢的。
ClientWorker構造初始化中,啟動了一個定時任務去執行checkConfigInfo()
方法,這個方法主要是定時檢查本地設定和伺服器上的設定的變更情況,這個方法定義如下.
public void checkConfigInfo() { // Dispatch tasks. int listenerSize = cacheMap.size(); // // Round up the longingTaskCount. // 向上取整為批數,監聽的設定數量除以3000,得到一個整數,代表長輪訓任務的數量 int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); //currentLongingTaskCount表示當前的長輪訓任務數量,如果小於計算的結果,則可以繼續建立 if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // The task list is no order.So it maybe has issues when changing. executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } }
這個方法主要的目的是用來檢查伺服器端的設定資訊是否發生了變化。如果有變化,則觸發listener通知
cacheMap: AtomicReference<Map<String, CacheData>> cacheMap 用來儲存監聽變更的快取集合。key是根據dataID/group/tenant(租戶) 拼接的值。Value是對應儲存在nacos伺服器上的組態檔的內容。
預設情況下,每個長輪訓LongPullingRunnable任務預設處理3000個監聽設定集。如果超過3000, 則需要啟動多個LongPollingRunnable去執行。
currentLongingTaskCount儲存已啟動的LongPullingRunnable任務數
executorService
就是在ClientWorker構造方法中初始化的執行緒池
LongPollingRunnable長輪訓任務的實現邏輯,程式碼比較長,我們分段來分析。
第一部分主要有兩個邏輯
class LongPollingRunnable implements Runnable { private final int taskId; //表示當前任務批次id public LongPollingRunnable(int taskId) { this.taskId = taskId; } @Override public void run() { List<CacheData> cacheDatas = new ArrayList<CacheData>(); List<String> inInitializingCacheList = new ArrayList<String>(); try { // 遍歷CacheMap,把CacheMap中和當前任務id相同的快取,儲存到cacheDatas // 通過checkLocalConfig方法 for (CacheData cacheData : cacheMap.values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { checkLocalConfig(cacheData); if (cacheData.isUseLocalConfigInfo()) { //這裡表示資料有變化,需要通知監聽器 cacheData.checkListenerMd5(); //通知所有針對當前設定設定了監聽的監聽器 } } catch (Exception e) { LOGGER.error("get local config info error", e); } } } //省略部分 } catch (Throwable e) { // If the rotation training task is abnormal, the next execution time of the task will be punished LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); //出現異常,到下一次taskPenaltyTime後重新執行任務 } } }
檢查本地設定,這裡面有三種情況
private void checkLocalConfig(CacheData cacheData) { final String dataId = cacheData.dataId; final String group = cacheData.group; final String tenant = cacheData.tenant; File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant); // 沒有 -> 有 if (!cacheData.isUseLocalConfigInfo() && path.exists()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE); cacheData.setUseLocalConfigInfo(true); cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); String encryptedDataKey = LocalEncryptedDataKeyProcessor .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant); cacheData.setEncryptedDataKey(encryptedDataKey); LOGGER.warn( "[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); return; } // 有 -> 沒有。不通知業務監聽器,從server拿到設定後通知。 // If use local config info, then it doesn't notify business listener and notify after getting from server. if (cacheData.isUseLocalConfigInfo() && !path.exists()) { cacheData.setUseLocalConfigInfo(false); LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); return; } // 有變更 if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path .lastModified()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE); cacheData.setUseLocalConfigInfo(true); cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); String encryptedDataKey = LocalEncryptedDataKeyProcessor .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant); cacheData.setEncryptedDataKey(encryptedDataKey); LOGGER.warn( "[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); } }
遍歷使用者自己新增的監聽器,如果發現資料的md5值不同,則傳送通知
void checkListenerMd5() { for (ManagerListenerWrap wrap : listeners) { if (!md5.equals(wrap.lastCallMd5)) { safeNotifyListener(dataId, group, content, type, md5, wrap); } } }
在LongPollingRunnable.run中,先通過本地設定的讀取和檢查來判斷資料是否發生變化從而實現變化的通知
接著,當前的執行緒還需要去遠端伺服器上獲得最新的資料,檢查哪些資料發生了變化
// check server config //從伺服器端獲取發生變化的資料的DataID列表,儲存在List<String>集合中 List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); if (!CollectionUtils.isEmpty(changedGroupKeys)) { LOGGER.info("get changedGroupKeys:" + changedGroupKeys); } //遍歷發生了變更的設定項 for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { //逐項根據這些設定項獲取設定資訊 ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L); //把設定資訊儲存到CacheData中 CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(response.getContent()); cache.setEncryptedDataKey(response.getEncryptedDataKey()); if (null != response.getConfigType()) { cache.setType(response.getConfigType()); } LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(response.getContent()), response.getConfigType()); } catch (NacosException ioe) { String message = String .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } //再遍歷CacheData這個集合,找到發生變化的資料進行通知 for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); //繼續傳遞當前執行緒進行輪詢 executorService.execute(this);
這個方法主要是向伺服器端發起檢查請求,判斷自己原生的設定和伺服器端的設定是否一致。
/** * 從Server獲取值變化了的DataID列表。返回的物件裡只有dataId和group是有效的。 保證不返回NULL。 */ List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException { StringBuilder sb = new StringBuilder(); for (CacheData cacheData : cacheDatas) { //把需要檢查的設定項,拼接成一個字串 if (!cacheData.isUseLocalConfigInfo()) { //找到isUseLocalConfigInfo=false的快取 sb.append(cacheData.dataId).append(WORD_SEPARATOR); sb.append(cacheData.group).append(WORD_SEPARATOR); if (StringUtils.isBlank(cacheData.tenant)) { sb.append(cacheData.getMd5()).append(LINE_SEPARATOR); } else { sb.append(cacheData.getMd5()).append(WORD_SEPARATOR); sb.append(cacheData.getTenant()).append(LINE_SEPARATOR); } if (cacheData.isInitializing()) {// // cacheData 首次出現在cacheMap中&首次check更新 inInitializingCacheList .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant)); } } } boolean isInitializingCacheList = !inInitializingCacheList.isEmpty(); return checkUpdateConfigStr(sb.toString(), isInitializingCacheList); }
從Server獲取值變化了的DataID列表。返回的物件裡只有dataId和group是有效的。 保證不返回NULL。
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception { //拼接引數和header Map<String, String> params = new HashMap<String, String>(2); params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); Map<String, String> headers = new HashMap<String, String>(2); headers.put("Long-Pulling-Timeout", "" + timeout); // told server do not hang me up if new initializing cacheData added in if (isInitializingCacheList) { headers.put("Long-Pulling-Timeout-No-Hangup", "true"); } if (StringUtils.isBlank(probeUpdateString)) {//判斷可能發生變更的字串是否為空,如果是,則直接返回。 return Collections.emptyList(); } try { // In order to prevent the server from handling the delay of the client's long task, // increase the client's read timeout to avoid this problem. // 設定readTimeoutMs,也就是本次請求等待響應的超時時間,預設是30s long readTimeoutMs = timeout + (long) Math.round(timeout >> 1); //發起遠端呼叫 HttpRestResult<String> result = agent .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), readTimeoutMs); if (result.ok()) { //如果響應成功 setHealthServer(true); return parseUpdateDataIdResponse(result.getData()); //解析並更新資料,返回的是確實發生了資料變更的字串:tenant/group/dataid。 } else {//如果響應失敗 setHealthServer(false); LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.getCode()); } } catch (Exception e) { setHealthServer(false); LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e); throw e; } return Collections.emptyList(); }
整體實現的核心點就一下幾個部分
對本地快取的設定做任務拆分,每一個批次是3000條
針對每3000條建立一個執行緒去執行
先把每一個批次的快取和本地磁碟檔案中的資料進行比較,
先以tenent/groupId/dataId拼接成字串,傳送到伺服器端進行檢查,返回發生了變更的設定
使用者端收到變更設定列表,再逐項遍歷傳送到伺服器端獲取設定內容。
分析完使用者端之後,隨著好奇心的驅使,伺服器端是如何處理使用者端的請求的?那麼同樣,我們需要思考幾個問題
使用者端發起的請求地址是:/v1/cs/configs/listener
,於是找到這個介面進行檢視,程式碼如下。
//# ConfigController.java @PostMapping("/listener") @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class) public void listener(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true); String probeModify = request.getParameter("Listening-Configs"); if (StringUtils.isBlank(probeModify)) { throw new IllegalArgumentException("invalid probeModify"); } probeModify = URLDecoder.decode(probeModify, Constants.ENCODE); Map<String, String> clientMd5Map; try { //解析使用者端傳遞過來的可能發生變化的設定專案,轉化為Map集合(key=dataId,value=md5) clientMd5Map = MD5Util.getClientMd5Map(probeModify); } catch (Throwable e) { throw new IllegalArgumentException("invalid probeModify"); } // 開始執行長輪訓。 inner.doPollingConfig(request, response, clientMd5Map, probeModify.length()); }
這個方法主要是用來做長輪訓和短輪詢的判斷
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize) throws IOException { // 判斷當前請求是否支援長輪訓。() if (LongPollingService.isSupportLongPolling(request)) { longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize); return HttpServletResponse.SC_OK + ""; } //如果是短輪詢,走下面的請求,下面的請求就是把使用者端傳過來的資料和伺服器端的資料逐項進行比較,儲存到changeGroups中。 // Compatible with short polling logic. List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map); // Compatible with short polling result. String oldResult = MD5Util.compareMd5OldResult(changedGroups); String newResult = MD5Util.compareMd5ResultString(changedGroups); String version = request.getHeader(Constants.CLIENT_VERSION_HEADER); if (version == null) { version = "2.0.0"; } int versionNum = Protocol.getVersionNumber(version); // Before 2.0.4 version, return value is put into header. if (versionNum < START_LONG_POLLING_VERSION_NUM) { response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult); response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult); } else { request.setAttribute("content", newResult); } Loggers.AUTH.info("new content:" + newResult); // Disable cache. response.setHeader("Pragma", "no-cache"); response.setDateHeader("Expires", 0); response.setHeader("Cache-Control", "no-cache,no-store"); response.setStatus(HttpServletResponse.SC_OK); return HttpServletResponse.SC_OK + ""; }
把使用者端的請求,儲存到長輪訓的執行引擎中。
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, int probeRequestSize) { //獲取使用者端長輪訓的超時時間 String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); //不允許斷開的標記 String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER); //應用名稱 String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); // String tag = req.getHeader("Vipserver-Tag"); //延期時間,預設為500ms int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout. // 提前500ms返回一個響應,避免使用者端出現超時 long timeout = Math.max(10000, Long.parseLong(str) - delayTime); if (isFixedPolling()) { timeout = Math.max(10000, getFixedPollingInterval()); // Do nothing but set fix polling timeout. } else { long start = System.currentTimeMillis(); //通過md5判斷使用者端請求過來的key是否有和伺服器端有不一致的,如果有,則儲存到changedGroups中。 List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map); if (changedGroups.size() > 0) { //如果發現有變更,則直接把請求返回給使用者端 generateResponse(req, rsp, changedGroups); LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { //如果noHangUpFlag為true,說明不需要掛起使用者端,所以直接返回。 LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } } //獲取請求端的ip String ip = RequestUtil.getRemoteIp(req); // Must be called by http thread, or send response. //把當前請求轉化為一個非同步請求(意味著此時tomcat執行緒被釋放,也就是使用者端的請求,需要通過asyncContext來手動觸發返回,否則一直掛起) final AsyncContext asyncContext = req.startAsync(); // AsyncContext.setTimeout() is incorrect, Control by oneself asyncContext.setTimeout(0L); //設定非同步請求超時時間, //執行長輪訓請求 ConfigExecutor.executeLongPolling( new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); }
接下來我們來分析一下,clientLongPolling到底做了什麼操作。或者說我們可以先猜測一下應該會做什麼事情
基於這些猜想,我們可以看看它的實現過程
從程式碼粗粒度來看,它的實現似乎和我們的猜想一致,在run方法中,通過scheduler.schedule實現了一個定時任務,它的delay時間正好是前面計算的29.5s。在這個任務中,會通過MD5Util.compareMd5來進行計算
那另外一個,當資料發生變化以後,肯定不能等到29.5s之後才通知呀,那怎麼辦呢?我們發現有一個allSubs
的東西,它似乎和釋出訂閱有關係。那是不是有可能當前的clientLongPolling訂閱了資料變化的事件呢?
class ClientLongPolling implements Runnable { @Override public void run() { //構建一個非同步任務,延後29.5s執行 asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() { @Override public void run() { //如果達到29.5s,說明這個期間沒有做任何設定修改,則自動觸發執行 try { getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis()); // Delete subsciber's relations. allSubs.remove(ClientLongPolling.this); //移除訂閱關係 if (isFixedPolling()) { //如果是固定間隔的長輪訓 LogUtil.CLIENT_LOG .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix", RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); //比較變更的key List<String> changedGroups = MD5Util .compareMd5((HttpServletRequest) asyncContext.getRequest(), (HttpServletResponse) asyncContext.getResponse(), clientMd5Map); if (changedGroups.size() > 0) {//如果大於0,表示有變更,直接響應 sendResponse(changedGroups); } else { sendResponse(null); //否則返回null } } else { LogUtil.CLIENT_LOG .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout", RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); sendResponse(null); } } catch (Throwable t) { LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause()); } } }, timeoutTime, TimeUnit.MILLISECONDS); allSubs.add(this); //把當前執行緒新增到訂閱事件佇列中 } }
allSubs是一個佇列,佇列裡面放了ClientLongPolling這個物件。這個佇列似乎和設定變更有某種關聯關係。
那麼這裡必須要實現的是,當用戶在nacos 控制檯修改了設定之後,必須要從這個訂閱關係中取出關注的使用者端長連線,然後把變更的結果返回。於是我們去看LongPollingService的構造方法查詢訂閱關係
/** * 長輪詢訂閱關係 */ final Queue<ClientLongPolling> allSubs; allSubs.add(this);
在LongPollingService的構造方法中,使用了一個NotifyCenter訂閱了一個事件,其中不難發現,如果這個事件的範例是LocalDataChangeEvent,也就是伺服器端資料發生變更的時間,就會執行一個DataChangeTask
的執行緒。
public LongPollingService() { allSubs = new ConcurrentLinkedQueue<ClientLongPolling>(); ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS); // Register LocalDataChangeEvent to NotifyCenter. NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize); //註冊LocalDataChangeEvent訂閱事件 NotifyCenter.registerSubscriber(new Subscriber() { @Override public void onEvent(Event event) { if (isFixedPolling()) { // Ignore. } else { if (event instanceof LocalDataChangeEvent) { //如果觸發了LocalDataChangeEvent,則執行下面的程式碼 LocalDataChangeEvent evt = (LocalDataChangeEvent) event; ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); } } } @Override public Class<? extends Event> subscribeType() { return LocalDataChangeEvent.class; } }); }
資料變更事件執行緒,程式碼如下
class DataChangeTask implements Runnable { @Override public void run() { try { ConfigCacheService.getContentBetaMd5(groupKey); // //遍歷所有訂閱事件表 for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) { ClientLongPolling clientSub = iter.next(); //得到ClientLongPolling //判斷當前的ClientLongPolling中,請求的key是否包含當前修改的groupKey if (clientSub.clientMd5Map.containsKey(groupKey)) { // If published tag is not in the beta list, then it skipped. if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) { //如果是beta方式且betaIps不包含當前使用者端ip,直接返回 continue; } // If published tag is not in the tag list, then it skipped. if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {//如果設定了tag標籤且不包含當前使用者端的tag,直接返回 continue; } // getRetainIps().put(clientSub.ip, System.currentTimeMillis()); iter.remove(); // Delete subscribers' relationships. 移除當前使用者端的訂閱關係 LogUtil.CLIENT_LOG .info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance", RequestUtil .getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()), "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey); clientSub.sendResponse(Arrays.asList(groupKey)); //響應使用者端請求。 } } } catch (Throwable t) { LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t)); } } }
以上就是Nacos使用者端設定中心快取動態更新實現原始碼的詳細內容,更多關於Nacos使用者端設定中心快取動態更新的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45