<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
從遠端伺服器獲取變更資料的主要模式有兩種:推(push)和拉(pull)。Push 模式簡單來說就是伺服器端主動將資料變更資訊推播給使用者端,這種模式優點是時效性好,伺服器端資料發生變更可以立馬通知到使用者端,但這種模式需要伺服器端維持與使用者端的心跳連線,會增加伺服器端實現的複雜度,伺服器端也需要佔用更多的資源來維持與使用者端的連線。
而 Pull 模式則是使用者端主動去伺服器請求資料,例如,每間隔10ms就向伺服器端發起請求獲取資料。顯而易見pull模式存在時效性問題。
請求的間隔也不太好設定,間隔太短,對伺服器請求壓力過大。間隔時間過長,那麼必然會造成時效性很差。而且如果設定長時間不更新,並且存在大量的使用者端就會產生大量無效的pull請求。
Nacos 沒有采用上述的兩種模式,而是採用了長輪詢方式結合了推和拉的優點:
如果你覺得原始碼枯燥的話,可以選擇不看後半部分的原始碼,先通過這張流程圖去了解Nacos動態重新整理機制的流程:
首先,開啟 com.alibaba.cloud.nacos.NacosConfigBootstrapConfiguration
這個類,從類名也可以看出該類是Nacos Config的啟動設定類,是Nacos Config自動裝配的入口。在該類中的 nacosConfigManager
方法範例化了一個 NacosConfigManager
物件,並註冊到容器中:
@Bean @ConditionalOnMissingBean public NacosConfigManager nacosConfigManager( NacosConfigProperties nacosConfigProperties) { return new NacosConfigManager(nacosConfigProperties); }
在 NacosConfigManager
的構造器中呼叫了 createConfigService
方法,這是一個靜態方法用來建立 ConfigService
物件的單例。
/** * Compatible with old design,It will be perfected in the future. */ static ConfigService createConfigService( NacosConfigProperties nacosConfigProperties) { // 雙重檢查鎖模式的單例 if (Objects.isNull(service)) { synchronized (NacosConfigManager.class) { try { if (Objects.isNull(service)) { service = NacosFactory.createConfigService( nacosConfigProperties.assembleConfigServiceProperties()); } } catch (NacosException e) { log.error(e.getMessage()); throw new NacosConnectionFailureException( nacosConfigProperties.getServerAddr(), e.getMessage(), e); } } } return service; }
ConfigService
的具體實現是 NacosConfigService
,在該類的構造器中主要初始化了 HttpAgent
和 ClientWorker
物件。
ClientWorker
的構造器中則初始化了幾個執行緒池:
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; // Initialize the timeout parameter init(properties); // 建立具有定時執行功能的單執行緒池,用於定時執行 checkConfigInfo 方法 this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); // 建立具有定時執行功能的且執行緒數與cpu核數相對應的執行緒池,用於根據需要動態重新整理的組態檔執行 LongPollingRunnable,因此長輪詢任務是可以有多個並行的 this.executorService = Executors .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); // 每10ms執行一次 checkConfigInfo 方法 this.executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); } private void init(Properties properties) { // 長輪詢的超時時間,預設為30秒,此引數會被放到請求頭中帶到伺服器端,伺服器端會根據該引數去做長輪詢的hold timeout = Math.max(ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT), Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT); taskPenaltyTime = ConvertUtils .toInt(properties.getProperty(PropertyKeyConst.CONFIG_RETRY_TIME), Constants.CONFIG_RETRY_TIME); this.enableRemoteSyncConfig = Boolean .parseBoolean(properties.getProperty(PropertyKeyConst.ENABLE_REMOTE_SYNC_CONFIG)); } /** * Check config info. */ public void checkConfigInfo() { // Dispatch taskes. // 獲取需要監聽的檔案數量 int listenerSize = cacheMap.size(); // Round up the longingTaskCount. // 預設一個 LongPollingRunnable 可以處理監聽3k個組態檔的變化,超過3k個才會建立新的 LongPollingRunnable int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // The task list is no order.So it maybe has issues when changing. executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } }
LongPollingRunnable
類主要用於檢查本地設定,以及長輪詢地去伺服器端獲取變更設定的 dataid 和 group,其程式碼位於 com.alibaba.nacos.client.config.impl.ClientWorker
類,程式碼如下:
class LongPollingRunnable implements Runnable { private final int taskId; public LongPollingRunnable(int taskId) { this.taskId = taskId; } @Override public void run() { List<CacheData> cacheDatas = new ArrayList<CacheData>(); List<String> inInitializingCacheList = new ArrayList<String>(); try { // check failover config // 遍歷本地快取的設定 for (CacheData cacheData : cacheMap.values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { // 檢查本地設定 checkLocalConfig(cacheData); if (cacheData.isUseLocalConfigInfo()) { cacheData.checkListenerMd5(); } } catch (Exception e) { LOGGER.error("get local config info error", e); } } } // check server config // 通過長輪詢檢查伺服器端設定 List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); if (!CollectionUtils.isEmpty(changedGroupKeys)) { LOGGER.info("get changedGroupKeys:" + changedGroupKeys); } for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { String[] ct = getServerConfig(dataId, group, tenant, 3000L); CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(ct[0]); if (null != ct[1]) { cache.setType(ct[1]); } LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(ct[0]), ct[1]); } catch (NacosException ioe) { String message = String .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); executorService.execute(this); } catch (Throwable e) { // If the rotation training task is abnormal, the next execution time of the task will be punished LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } } }
上面有個 checkUpdateDataIds
方法,用於獲取發生變更了的組態檔的dataId列表,它同樣位於 ClientWorker
內。
如下:
/** * Fetch the dataId list from server. * * @param cacheDatas CacheDatas for config infomations. * @param inInitializingCacheList initial cache lists. * @return String include dataId and group (ps: it maybe null). * @throws Exception Exception. */ List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception { // 拼接出組態檔的唯一標識 StringBuilder sb = new StringBuilder(); for (CacheData cacheData : cacheDatas) { if (!cacheData.isUseLocalConfigInfo()) { sb.append(cacheData.dataId).append(WORD_SEPARATOR); sb.append(cacheData.group).append(WORD_SEPARATOR); if (StringUtils.isBlank(cacheData.tenant)) { sb.append(cacheData.getMd5()).append(LINE_SEPARATOR); } else { sb.append(cacheData.getMd5()).append(WORD_SEPARATOR); sb.append(cacheData.getTenant()).append(LINE_SEPARATOR); } if (cacheData.isInitializing()) { // It updates when cacheData occours in cacheMap by first time. inInitializingCacheList .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant)); } } } boolean isInitializingCacheList = !inInitializingCacheList.isEmpty(); return checkUpdateConfigStr(sb.toString(), isInitializingCacheList); } /** * Fetch the updated dataId list from server. * * @param probeUpdateString updated attribute string value. * @param isInitializingCacheList initial cache lists. * @return The updated dataId list(ps: it maybe null). * @throws IOException Exception. */ List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception { Map<String, String> params = new HashMap<String, String>(2); params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); Map<String, String> headers = new HashMap<String, String>(2); // 長輪詢的超時時間 headers.put("Long-Pulling-Timeout", "" + timeout); // told server do not hang me up if new initializing cacheData added in if (isInitializingCacheList) { headers.put("Long-Pulling-Timeout-No-Hangup", "true"); } if (StringUtils.isBlank(probeUpdateString)) { return Collections.emptyList(); } try { // In order to prevent the server from handling the delay of the client's long task, // increase the client's read timeout to avoid this problem. long readTimeoutMs = timeout + (long) Math.round(timeout >> 1); // 向伺服器端發起一個http請求,該請求在伺服器端設定沒有變更的情況下預設會hang住30s HttpRestResult<String> result = agent .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), readTimeoutMs); if (result.ok()) { setHealthServer(true); // 響應狀態是成功則解析響應體得到 dataId、group、tenant 等資訊並返回 return parseUpdateDataIdResponse(result.getData()); } else { setHealthServer(false); LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.getCode()); } } catch (Exception e) { setHealthServer(false); LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e); throw e; } return Collections.emptyList(); }
使用者端對 listener
介面的請求會進入到伺服器端的com.alibaba.nacos.config.server.controller.ConfigController#listener
方法進行處理,該方法主要是呼叫了 com.alibaba.nacos.config.server.controller.ConfigServletInner#doPollingConfig
方法。
程式碼如下:
/** * 輪詢介面 */ public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize) throws IOException, ServletException { // 如果支援長輪詢則進入長輪詢的流程 if (LongPollingService.isSupportLongPolling(request)) { longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize); return HttpServletResponse.SC_OK + ""; } // else 相容短輪詢邏輯 List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map); // 相容短輪詢result String oldResult = MD5Util.compareMd5OldResult(changedGroups); String newResult = MD5Util.compareMd5ResultString(changedGroups); String version = request.getHeader(Constants.CLIENT_VERSION_HEADER); if (version == null) { version = "2.0.0"; } int versionNum = Protocol.getVersionNumber(version); /** * 2.0.4版本以前, 返回值放入header中 */ if (versionNum < START_LONGPOLLING_VERSION_NUM) { response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult); response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult); } else { request.setAttribute("content", newResult); } // 禁用快取 response.setHeader("Pragma", "no-cache"); response.setDateHeader("Expires", 0); response.setHeader("Cache-Control", "no-cache,no-store"); response.setStatus(HttpServletResponse.SC_OK); return HttpServletResponse.SC_OK + ""; }
我們主要關注上面的 com.alibaba.nacos.config.server.service.LongPollingService#addLongPollingClient
長輪詢流程的方法。
程式碼如下:
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, int probeRequestSize) { String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER); String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); String tag = req.getHeader("Vipserver-Tag"); int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); /** * 提前500ms返回響應,為避免使用者端超時 @qiaoyi.dingqy 2013.10.22改動 add delay time for LoadBalance */ long timeout = Math.max(10000, Long.parseLong(str) - delayTime); if (isFixedPolling()) { timeout = Math.max(10000, getFixedPollingInterval()); // do nothing but set fix polling timeout } else { long start = System.currentTimeMillis(); List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map); if (changedGroups.size() > 0) { generateResponse(req, rsp, changedGroups); LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } } String ip = RequestUtil.getRemoteIp(req); // 一定要由HTTP執行緒呼叫,否則離開後容器會立即傳送響應 final AsyncContext asyncContext = req.startAsync(); // AsyncContext.setTimeout()的超時時間不準,所以只能自己控制 asyncContext.setTimeout(0L); // 在 ClientLongPolling 的 run 方法會將 ClientLongPolling 範例(攜帶了本次請求的相關資訊)放入 allSubs 中,然後會在29.5s後再執行另一個 Runnable,該 Runnable 用於等待29.5s後依舊沒有相應的設定變更時對使用者端進行響應,並將相應的 ClientLongPolling 範例從 allSubs 中移出 scheduler.execute( new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); }
而 LongPollingService
實現了 AbstractEventListener
,也就是說能接收事件通知,在其 com.alibaba.nacos.config.server.service.LongPollingService#onEvent
方法中可以看到,它關注的是 LocalDataChangeEvent
事件:
@Override public void onEvent(Event event) { if (isFixedPolling()) { // ignore } else { if (event instanceof LocalDataChangeEvent) { LocalDataChangeEvent evt = (LocalDataChangeEvent)event; scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); } } }
在nacos上修改設定後就會產生 LocalDataChangeEvent
事件,此時 LongPollingService
也就能監聽到,當收到該事件時就會遍歷 allSubs
,找到匹配的請求並將 groupKey
返回給使用者端。
具體程式碼在 DataChangeTask
中:
class DataChangeTask implements Runnable { @Override public void run() { try { ConfigService.getContentBetaMd5(groupKey); for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) { ClientLongPolling clientSub = iter.next(); if (clientSub.clientMd5Map.containsKey(groupKey)) { // 如果beta釋出且不在beta列表直接跳過 if (isBeta && !betaIps.contains(clientSub.ip)) { continue; } // 如果tag釋出且不在tag列表直接跳過 if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) { continue; } getRetainIps().put(clientSub.ip, System.currentTimeMillis()); iter.remove(); // 刪除訂閱關係 LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance", RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()), "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey); clientSub.sendResponse(Arrays.asList(groupKey)); } } } catch (Throwable t) { LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause()); } } DataChangeTask(String groupKey) { this(groupKey, false, null); } DataChangeTask(String groupKey, boolean isBeta, List<String> betaIps) { this(groupKey, isBeta, betaIps, null); } DataChangeTask(String groupKey, boolean isBeta, List<String> betaIps, String tag) { this.groupKey = groupKey; this.isBeta = isBeta; this.betaIps = betaIps; this.tag = tag; } final String groupKey; final long changeTime = System.currentTimeMillis(); final boolean isBeta; final List<String> betaIps; final String tag; }
當用戶端收到變更的dataid+group後,就會去伺服器端獲取最新的設定資料,並更新本地資料 cacheData
,然後傳送資料變更事件,整個流程結束。
com.alibaba.nacos.client.config.impl.ClientWorker#getServerConfig
com.alibaba.nacos.client.config.impl.CacheData#checkListenerMd5
最後附上一張流程與原始碼的對應圖:
以上就是Spring Cloud整合Nacos Config動態重新整理原始碼剖析的詳細內容,更多關於Spring Cloud整合Nacos Config的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45