<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
Nacos作為設定中心,必然需要保證服務節點的高可用性,那麼Nacos是如何實現叢集的呢?
下面這個圖,表示Nacos叢集的部署圖。
Nacos作為設定中心的叢集結構中,是一種無中心化節點的設計,由於沒有主從節點,也沒有選舉機制,所以為了能夠實現熱備,就需要增加虛擬IP(VIP)。
Nacos的資料儲存分為兩部分
在Nacos的設計中,Mysql是一箇中心資料倉儲,且認為在Mysql中的資料是絕對正確的。 除此之外,Nacos在啟動時會把Mysql中的資料寫一份到本地磁碟。
這麼設計的好處是可以提高效能,當用戶端需要請求某個設定項時,伺服器端會想Ian從磁碟中讀取對應檔案返回,而磁碟的讀取效率要比資料庫效率高。
當設定發生變更時:
另外,NacosServer啟動後,會同步啟動一個定時任務,每隔6小時,會dump一次全量資料到本地檔案
當設定發生修改、刪除、新增操作時,通過釋出一個notifyConfigChange
事件。
@PostMapping @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class) public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response, @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group, @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag, @RequestParam(value = "appName", required = false) String appName, @RequestParam(value = "src_user", required = false) String srcUser, @RequestParam(value = "config_tags", required = false) String configTags, @RequestParam(value = "desc", required = false) String desc, @RequestParam(value = "use", required = false) String use, @RequestParam(value = "effect", required = false) String effect, @RequestParam(value = "type", required = false) String type, @RequestParam(value = "schema", required = false) String schema) throws NacosException { //省略.. if (StringUtils.isBlank(betaIps)) { if (StringUtils.isBlank(tag)) { persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true); ConfigChangePublisher .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime())); } else { persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true); ConfigChangePublisher.notifyConfigChange( new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime())); } }//省略 return true; }
設定資料變更事件,專門有一個監聽器AsyncNotifyService,它會處理資料變更後的同步事件。
@Autowired public AsyncNotifyService(ServerMemberManager memberManager) { this.memberManager = memberManager; // Register ConfigDataChangeEvent to NotifyCenter. NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize); // Register A Subscriber to subscribe ConfigDataChangeEvent. NotifyCenter.registerSubscriber(new Subscriber() { @Override public void onEvent(Event event) { // Generate ConfigDataChangeEvent concurrently if (event instanceof ConfigDataChangeEvent) { ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event; long dumpTs = evt.lastModifiedTs; String dataId = evt.dataId; String group = evt.group; String tenant = evt.tenant; String tag = evt.tag; Collection<Member> ipList = memberManager.allMembers(); //得到叢集中的ip列表 // 構建NotifySingleTask,並新增到佇列中。 Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>(); for (Member member : ipList) { //遍歷叢集中的每個節點 queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(), evt.isBeta)); } //非同步執行任務 AsyncTask ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue)); } } @Override public Class<? extends Event> subscribeType() { return ConfigDataChangeEvent.class; } }); }
@Override public void run() { executeAsyncInvoke(); } private void executeAsyncInvoke() { while (!queue.isEmpty()) {//遍歷佇列中的資料,直到資料為空 NotifySingleTask task = queue.poll(); //獲取task String targetIp = task.getTargetIP(); //獲取目標ip if (memberManager.hasMember(targetIp)) { //如果叢集中的ip列表包含目標ip // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify //判斷目標ip的健康狀態 boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp); // if (unHealthNeedDelay) { //如果目標服務是非健康,則繼續新增到佇列中,延後再執行。 // target ip is unhealthy, then put it in the notification list ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target); // get delay time and set fail count to the task asyncTaskExecute(task); } else { //構建header Header header = Header.newInstance(); header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified())); header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP()); if (task.isBeta) { header.addParam("isBeta", "true"); } AuthHeaderUtil.addIdentityToHeader(header); //通過restTemplate發起遠端呼叫,如果呼叫成功,則執行AsyncNotifyCallBack的回撥方法 restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task)); } } } }
資料同步的請求地址為,task.url=http://192.168.8.16:8848/nacos/v1/cs/communication/dataChange?dataId=log.yaml&group=DEFAULT_GROUP
@GetMapping("/dataChange") public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId, @RequestParam("group") String group, @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, @RequestParam(value = "tag", required = false) String tag) { dataId = dataId.trim(); group = group.trim(); String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED); long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified); String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP); String isBetaStr = request.getHeader("isBeta"); if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) { dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true); } else { // dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp); } return true; }
dumpService.dump用來實現設定的更新,程式碼如下
當前任務會被新增到DumpTaskMgr中管理。
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp, boolean isBeta) { String groupKey = GroupKey2.getKey(dataId, group, tenant); String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag); dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta)); DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey); }
TaskManager.addTask, 先呼叫父類別去完成任務新增。
@Override public void addTask(Object key, AbstractDelayTask newTask) { super.addTask(key, newTask); MetricsMonitor.getDumpTaskMonitor().set(tasks.size()); }
在這種場景設計中,一般都會採用生產者消費者模式來完成,因此這裡不難猜測到,任務會被儲存到一個佇列中,然後有另外一個執行緒來執行。
TaskManager的父類別是NacosDelayTaskExecuteEngine,
這個類中有一個成員屬性protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;,專門來儲存延期執行的任務型別AbstractDelayTask.
在這個類的構造方法中,初始化了一個延期執行的任務,其中具體的任務是ProcessRunnable.
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { super(logger); tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity); processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); processingExecutor .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); }
private class ProcessRunnable implements Runnable { @Override public void run() { try { processTasks(); } catch (Throwable e) { getEngineLog().error(e.toString(), e); } } }
protected void processTasks() { //獲取所有的任務 Collection<Object> keys = getAllTaskKeys(); for (Object taskKey : keys) { AbstractDelayTask task = removeTask(taskKey); if (null == task) { continue; } //獲取任務處理器,這裡返回的是DumpProcessor NacosTaskProcessor processor = getProcessor(taskKey); if (null == processor) { getEngineLog().error("processor not found for task, so discarded. " + task); continue; } try { // ReAdd task if process failed //執行具體任務 if (!processor.process(task)) { retryFailedTask(taskKey, task); } } catch (Throwable e) { getEngineLog().error("Nacos task execute error : " + e.toString(), e); retryFailedTask(taskKey, task); } } }
讀取資料庫的最新資料,然後更新本地快取和磁碟
以上就是Nacos設定中心叢集原理及原始碼分析的詳細內容,更多關於Nacos設定中心叢集原理的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45