<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
本系列RocketMQ4.8註釋github地址,希望對大家有所幫助,要是覺得可以的話麻煩給點一下Star哈
首先我們需要介紹下在RocketMQ中哪些檔案需要清理,其實可以想一想,在RocketMQ中哪些檔案是一直在往裡面寫入東西的,最容易想到的就是commitlog
了,因為在一個broker 程序中,所有的普通訊息,事務訊息,系統訊息啥的都往這個commitlog
中寫,隨著時間的越來越長,然後commitlog
就會越積攢越多,肯定會有磁碟放不下的那一天,而且我們訊息消費完成後,那些被消費完成後的訊息其實作用就很小了,可能會有這麼一個場景,比如說我線上出現了某個問題,我想看下關於這個問題的訊息有沒有被消費到,可能你會用到這個訊息,但是這種問題一般就是比較緊急的,最近實效的,之前那些訊息其實作用就基本沒有了,所以就需要清理掉之前的訊息。其實不光commitlog
需要清理,還需要清理一下ConsumeQueue
與indexFile
, 因為你commitlog
裡面的訊息都被清理了,ConsumeQueue
與indexFile
再儲存著之前的一些資料,就是純粹浪費空間了。
所以說 broker
檔案清理主要是清理commitlog , ConsumeQueue , indexFile
。
我們介紹下RocketMQ
檔案清理的機制,RocketMQ
預設是清理72小時之前的訊息
,然後它有幾個觸發條件, 預設是凌晨4點觸發清
理, 除非你你這個磁碟空間佔用到75%
以上了。在清理commitlog
的時候,並不是一條訊息一條訊息的清理,拿到所有的MappedFile
(拋去現在還在用著的,也就是最後一個) ,然後比對每個MappedFile
的最後一條訊息的時間,如果是72小時之前
的就把MappedFile
對應的檔案刪除了,銷燬對應MappedFile
,這種情況的話只要你MappedFile
最後一條訊息還在存活實效內的話
,它就不會清理你這個MappedFile
,就算你這個MappedFile
靠前的訊息過期了。但是有一種情況它不管你訊息超沒超過72小時,直接就是刪,那就是磁碟空間不足的時候,也就是佔了85%
以上了,就會立即清理。
清理完成commitlog
之後,就會拿到commitlog
中最小的offset
,然後去ConsumeQueue
與indexFile
中把小於offset
的記錄刪除掉。清理ConsumeQueue
的時候也是遍歷MappedFile
,然後它的最後一條訊息(unit)小於commitlog
中最小的offset
的話,就說明這個MappedFile
都小於offset
,因為他們是順序追加寫的,這個MappedFile 就會清理掉,如果你MappedFile
最後一個unit不是小於offset
的話,這個MappedFile
就不刪了。
我們來看下原始碼是怎樣實現的: 在broker 記憶體DefaultMessageStore
啟動(start)的時候,會新增幾個任務排程,其中有一個就是檔案清理的:
private void addScheduleTask() { // todo 清理過期檔案 每隔10s this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { // todo DefaultMessageStore.this.cleanFilesPeriodically(); } }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS); ... }
預設是10s
執行一次,可以看到它呼叫了DefaultMessageStore
的cleanFilesPeriodically
方法:
private void cleanFilesPeriodically() { // todo 清除CommitLog檔案 this.cleanCommitLogService.run(); // todo 清除ConsumeQueue檔案 this.cleanConsumeQueueService.run(); }
我們先來看下關於commitlog
的清理工作:
public void run() { try { // todo 刪除過期檔案 this.deleteExpiredFiles(); this.redeleteHangedFile(); } catch (Throwable e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } }
我們看下deleteExpiredFiles
方法的實現:
private void deleteExpiredFiles() { int deleteCount = 0; // 檔案保留時間,如果超過了該時間,則認為是過期檔案,可以被刪除 long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime(); // 刪除物理檔案的間隔時間,在一次清除過程中,可能需要被刪除的檔案不止一個,該值指定兩次刪除檔案的間隔時間 int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval(); // 在清除過期檔案時,如 //果該檔案被其他執行緒佔用(參照次數大於0,比如讀取訊息),此時會 //阻止此次刪除任務,同時在第一次試圖刪除該檔案時記錄當前時間 //戳,destroyMapedFileIntervalForcibly表示第一次拒絕刪除之後能 //保留檔案的最大時間,在此時間內,同樣可以被拒絕刪除,超過該時 //間後,會將參照次數設定為負數,檔案將被強制刪除 int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); // 指定刪除檔案的時間點,RocketMQ通過deleteWhen設定每天在 //固定時間執行一次刪除過期檔案操作,預設凌晨4點 boolean timeup = this.isTimeToDelete(); // todo 檢查磁碟空間是否充足,如果磁碟空間不充足,則返回true,表示應該觸發過期檔案刪除操作 boolean spacefull = this.isSpaceToDelete(); // 預留手工觸發機制,可以通過呼叫excuteDeleteFilesManualy //方法手工觸發刪除過期檔案的操作,目前RocketMQ暫未封裝手工觸發 //檔案刪除的命令 boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; if (timeup || spacefull || manualDelete) { if (manualDelete) this.manualDeleteFileSeveralTimes--; boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately; log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", fileReservedTime, timeup, spacefull, manualDeleteFileSeveralTimes, cleanAtOnce); fileReservedTime *= 60 * 60 * 1000; // todo 檔案的銷燬和刪除 deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, destroyMapedFileIntervalForcibly, cleanAtOnce); if (deleteCount > 0) { } else if (spacefull) { log.warn("disk space will be full soon, but delete file failed."); } } }
開始幾個引數,一個是檔案保留實效預設是72小時
,你可以使用fileReservedTime
來設定,一個是刪除檔案的間隔100ms
,再就是強行銷燬MappedFile
的120s
(這個為啥要強行銷燬,因為它還害怕還有地方用著這個MappedFile,它有個專門的參照計數器,比如說我還有地方要讀它的訊息,這個時候計數器就是+1的)。
接著就是判斷到沒到刪除的那個時間,它預設是凌晨4點才能刪除
:
private boolean isTimeToDelete() { // 什麼時候刪除,預設是凌晨4點 -> 04 String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen(); // 判斷是不是到點了 就是判斷的當前小時 是不是等於 預設的刪除時間 if (UtilAll.isItTimeToDo(when)) { DefaultMessageStore.log.info("it's time to reclaim disk space, " + when); return true; } return false; }
再接著就是看看空間是不是充足,看看磁碟空間使用佔比是什麼樣子的:
private boolean isSpaceToDelete() { // 表示CommitLog檔案、ConsumeQueue檔案所在磁碟分割區的最大使用量,如果超過該值,則需要立即清除過期檔案 double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; // 表示是否需要立即執行清除過期檔案的操作 cleanImmediately = false; { // 當前CommitLog目錄所在的磁碟分割區的磁碟使用率,通過File#getTotalSpace方法獲取檔案所在磁碟分割區的總容量, //通過File#getFreeSpace方法獲取檔案所在磁碟分割區的剩餘容量 double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic()); // diskSpaceWarningLevelRatio:預設0.90。如果磁碟分割區使用率超過該閾值,將設定磁碟為不可寫,此時會拒絕寫入新訊息 // 如果當前磁碟分割區使用率大於diskSpaceWarningLevelRatio,應該立即啟動過期檔案刪除操作 if (physicRatio > diskSpaceWarningLevelRatio) { // 設定 磁碟不可寫 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); if (diskok) { DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full"); } cleanImmediately = true; //diskSpaceCleanForciblyRatio:預設0.85 如果磁碟分割區使用超過該閾值,建議立即執行過期檔案刪除,但不會拒絕寫入新訊息 // 如果當前磁碟分割區使用率大於diskSpaceCleanForciblyRatio,建議立即執行過期檔案清除 } else if (physicRatio > diskSpaceCleanForciblyRatio) { cleanImmediately = true; } else { // 設定 磁碟可以寫入 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); if (!diskok) { DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok"); } } // 如果當前磁碟使用率小於diskMaxUsedSpaceRatio,則返回false,表示磁碟使用率正常, // 否則返回true,需要執行刪除過期檔案 if (physicRatio < 0 || physicRatio > ratio) { DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio); return true; } } /** * 對consumeQueue 做同樣的判斷 */ ... return false; }
這裡其實不光是判斷 commitlog
的儲存區域,後面還有段判斷ConsumeQueue
的儲存區域的,然後與這塊邏輯一樣,就沒有放上。這裡就是獲取預設的最大使用佔比 就是75%
,接著就是看看commitlog
儲存的那地方使用了多少了,如果是使用90%
了,就設定runningFlag
說磁碟滿了,立即清理設定成true
,這個引數設定成true之後,就不會管你訊息有沒有超過72小時,如果你使用了85%
以上了,也是設定立即清理,如果超過75%
返回true。好了,磁碟佔用空間這塊我們就看完了。
接著看上面deleteExpiredFiles
方法實現,還有一個手動清除
的,這塊我沒有找到哪裡有用到的,如果後續找到,會補充上, 判斷 到了清理的點 或者是磁碟空間滿了 或者是手動刪除了,滿足一個條件就ok了,如果是立即清除是個true,它這裡這個cleanAtOnce
變數就是true了,因為前面那個強制清理是預設開啟
的。
接著計算了一下fileReservedTime
就是將小時轉成了毫秒,為了後面好比對,最後就是呼叫commitlo
g的deleteExpiredFile
方法清理了:
/** * 刪除過期的檔案 * @param expiredTime 過期時間 預設72小時 * @param deleteFilesInterval 刪除檔案的間隔 100ms * @param intervalForcibly 強制刪除 1000 * 120 * @param cleanImmediately 是不是要一次性清理了 * @return */ public int deleteExpiredFile( final long expiredTime, final int deleteFilesInterval, final long intervalForcibly, final boolean cleanImmediately ) { // todo return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately); }
可以看到commitlog
物件呼叫mappedFileQueue
的deleteExpiredFileByTime
方法來處理的,這個mappedFileQueue
就是管理了一堆MappedFile
:
/** * 刪除檔案 * * 從倒數第二個檔案開始遍歷,計算檔案的最大存活時間,即檔案的最後一次更新時間+檔案存活時間(預設 * 72小時),如果當前時間大於檔案的最大存活時間或需要強制刪除文 * 件(當磁碟使用超過設定的閾值)時,執行MappedFile#destory方 * 法,清除MappedFile佔有的相關資源,如果執行成功,將該檔案加入 * 待刪除檔案列表中,最後統一執行File#delete方法將檔案從物理磁碟 * 中刪除。 */ public int deleteExpiredFileByTime(final long expiredTime, final int deleteFilesInterval, final long intervalForcibly, final boolean cleanImmediately) { // 拿到mappedFile的參照 Object[] mfs = this.copyMappedFiles(0); if (null == mfs) return 0; int mfsLength = mfs.length - 1; int deleteCount = 0; List<MappedFile> files = new ArrayList<MappedFile>(); if (null != mfs) { for (int i = 0; i < mfsLength; i++) { MappedFile mappedFile = (MappedFile) mfs[i]; // 計算檔案的最大存活時間,即檔案的最後一次更新時間+檔案存活時間(預設72小時) long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime; // 如果當前時間大於檔案的最大存活時間 或 需要強制刪除檔案(當磁碟使用超過設定的閾值)時 if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { // todo 執行destroy方法 if (mappedFile.destroy(intervalForcibly)) { files.add(mappedFile); deleteCount++; // 一批 最多刪除10 個 if (files.size() >= DELETE_FILES_BATCH_MAX) { break; } // 刪除間隔 if (deleteFilesInterval > 0 && (i + 1) < mfsLength) { try { Thread.sleep(deleteFilesInterval); } catch (InterruptedException e) { } } } else { break; } } else { //avoid deleting files in the middle break; } } } // todo 統一執行File#delete方法將檔案從物理磁碟中刪除 deleteExpiredFile(files); return deleteCount; }
這裡首先是拿到所有MappedFile
的參照,然後就是遍歷了,可以看到它這個length是-1的,也就是最後一個MappedFile
是遍歷不到的,這個是肯定的,因為最後一個MappedFile
肯定是在用著的,如果你來個強制清理,一下清理了,就沒法提供服務了。
遍歷的時候,拿到對應MappedFile
裡面最後一條訊息,看看它的寫入時間是不是已經過了這個過期時間了,或者直接強制刪除,就會執行MappedFile
的銷燬方法,而且帶著銷燬時間:
/** * 銷燬方法 * @param intervalForcibly 表示拒絕被銷燬的最大存活時間 * @return */ public boolean destroy(final long intervalForcibly) { // todo this.shutdown(intervalForcibly); // 清理結束 if (this.isCleanupOver()) { try { // 關閉檔案通道, this.fileChannel.close(); log.info("close file channel " + this.fileName + " OK"); long beginTime = System.currentTimeMillis(); // 刪除物理檔案 boolean result = this.file.delete(); log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:" + this.getFlushedPosition() + ", " + UtilAll.computeElapsedTimeMilliseconds(beginTime)); } catch (Exception e) { log.warn("close file channel " + this.fileName + " Failed. ", e); } return true; } else { log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName + " Failed. cleanupOver: " + this.cleanupOver); } return false; } public void shutdown(final long intervalForcibly) { // 關閉MappedFile if (this.available) { this.available = false; // 初次關閉的時間戳 this.firstShutdownTimestamp = System.currentTimeMillis(); // todo 嘗試釋放資源 this.release(); } else if (this.getRefCount() > 0) { if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) { this.refCount.set(-1000 - this.getRefCount()); this.release(); } } }
這裡就不詳細說了,其實就是shutdown
,然後過了120s後強制把參照清了
,之後就是關閉channel
,刪除對應檔案。
接著往下說,就是銷燬成功了,會記錄刪除數量,判斷刪了多少了,一批是最多刪10個的,這塊應該是怕影響效能的,你一直刪的的話,這東西很消耗磁碟效能,容易影響其他寫入,讀取功能,如果你銷燬失敗,直接就停了。最後就是將刪除的這些MappedFile
從MappedFileQueue
中刪除掉。再回到commitlog clean service
的run
方法:
public void run() { try { // todo 刪除過期檔案 this.deleteExpiredFiles(); // todo this.redeleteHangedFile(); } catch (Throwable e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } }
我們deleteExpiredFiles
方法已經介紹完了,然後再來看看第二個方法是幹嘛的,這個其實就是判斷第一個MappedFile
還可不可用了,如果不可用的話,就刪了,這塊有可能是上面 deleteExpiredFiles
方法MappedFile
銷燬失敗,然後設定了不可用,但是沒有清理掉,所以這塊再來善後下:
private void redeleteHangedFile() { // redeleteHangedFileInterval間隔 預設1000*120 int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval(); // 當前時間戳 long currentTimestamp = System.currentTimeMillis(); if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) { this.lastRedeleteTimestamp = currentTimestamp; // 獲取強制銷燬Mapped檔案間隔 int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); // todo 重新刪除第一個MappedFile if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) { } } } public boolean retryDeleteFirstFile(final long intervalForcibly) { // 獲取到 第一個mappedFile MappedFile mappedFile = this.getFirstMappedFile(); if (mappedFile != null) { // 不可用 if (!mappedFile.isAvailable()) { log.warn("the mappedFile was destroyed once, but still alive, " + mappedFile.getFileName()); // 銷燬 boolean result = mappedFile.destroy(intervalForcibly); if (result) { log.info("the mappedFile re delete OK, " + mappedFile.getFileName()); List<MappedFile> tmpFiles = new ArrayList<MappedFile>(); tmpFiles.add(mappedFile); this.deleteExpiredFile(tmpFiles); } else { log.warn("the mappedFile re delete failed, " + mappedFile.getFileName()); } return result; } } return false; }
這塊就是看第一個MappedFile
還可不可用,不可用的話,就銷燬掉。好了commitlog
檔案清理原始碼就解析完成了。接下來看下這個ConsumeQueue與indexFile
的清理。
private void cleanFilesPeriodically() { // todo 清除CommitLog檔案 this.cleanCommitLogService.run(); // todo 清除ConsumeQueue檔案 this.cleanConsumeQueueService.run(); }
DefaultMessageStore.CleanConsumeQueueService#run:
public void run() { try { // 刪除 過期的file this.deleteExpiredFiles(); } catch (Throwable e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } }
接下來DefaultMessageStore.CleanConsumeQueueService#deleteExpiredFiles:
private void deleteExpiredFiles() { // 刪除間隔 100 int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval(); // 獲取 commitLog 的最小offset long minOffset = DefaultMessageStore.this.commitLog.getMinOffset(); if (minOffset > this.lastPhysicalMinOffset) { // 上次 清理 到哪了 this.lastPhysicalMinOffset = minOffset; ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; // 遍歷刪除 for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) { for (ConsumeQueue logic : maps.values()) { // 進行刪除 int deleteCount = logic.deleteExpiredFile(minOffset); // 間隔 if (deleteCount > 0 && deleteLogicsFilesInterval > 0) { try { Thread.sleep(deleteLogicsFilesInterval); } catch (InterruptedException ignored) { } } } } // todo 刪除 過期的 indexFile DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset); } }
首先是獲取刪除間隔,然後拿到commitlog
中最小的那個offset
,接著就是判斷上次清理位置與最小offset
比較,如果offset
大於它上次清理的位置的話,就說明 它得把最小offset
之前的清理掉。先是記錄最後一次清理的offset
是最小offset
, 接著就是遍歷所有的ConsumeQueue
,呼叫每個ConsumeQueue
的 deleteExpiredFile
方法來清理,我們來看下這個方法:
public int deleteExpiredFile(long offset) { // 進行銷燬 然後得到銷燬個數 int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE); // 糾正最小偏移量 this.correctMinOffset(offset); return cnt; }
CQ_STORE_UNIT_SIZE
這個就是每個unit
佔20個位元組,見。
/** * 刪除過期的file * @param offset 最小offset * @param unitSize 大小為20位元組 * @return */ public int deleteExpiredFileByOffset(long offset, int unitSize) { Object[] mfs = this.copyMappedFiles(0); List<MappedFile> files = new ArrayList<MappedFile>(); int deleteCount = 0; if (null != mfs) { int mfsLength = mfs.length - 1; for (int i = 0; i < mfsLength; i++) { boolean destroy; MappedFile mappedFile = (MappedFile) mfs[i]; // 最後一個單元位置到這個MappedFile結束,其實就是獲取最後一個單元 SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize); if (result != null) { // 獲取最大的offset long maxOffsetInLogicQueue = result.getByteBuffer().getLong(); result.release(); // 判斷是否銷燬 如果小於offset 就要銷燬 destroy = maxOffsetInLogicQueue < offset; if (destroy) { log.info("physic min offset " + offset + ", logics in current mappedFile max offset " + maxOffsetInLogicQueue + ", delete it"); } } else if (!mappedFile.isAvailable()) { // Handle hanged file. log.warn("Found a hanged consume queue file, attempting to delete it."); destroy = true; } else { log.warn("this being not executed forever."); break; } // 進行銷燬 if (destroy && mappedFile.destroy(1000 * 60)) { files.add(mappedFile); deleteCount++; } else { break; } } } // 刪除參照 deleteExpiredFile(files); return deleteCount; }
它的刪除跟commitlog
的差不多,只不過commitlog 是根據時間來判斷的,它是根據commitlog 的offset 來判斷的,判斷要不要刪除這個MappedFile,如果這個MappedFile最後一個unit 儲存的offset 小於 commitlog 最小的offset 的話就要銷燬了。接著就是銷燬,超時時間是1分鐘,最後是刪除參照。
最後我們來看下 indexFile
的清理工作: DefaultMessageStore.CleanConsumeQueueService#deleteExpiredFiles:
private void deleteExpiredFiles() { // 刪除間隔 100 int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval(); // 獲取 commitLog 的最小offset long minOffset = DefaultMessageStore.this.commitLog.getMinOffset(); if (minOffset > this.lastPhysicalMinOffset) { ... // todo 刪除 過期的 indexFile DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset); } }
/** * 刪除 過期檔案 * @param offset 最小的offset 小於這個offset都要刪除 */ public void deleteExpiredFile(long offset) { Object[] files = null; try { // 獲取讀鎖 this.readWriteLock.readLock().lock(); if (this.indexFileList.isEmpty()) { return; } // 獲取第一個indexFile 的一個offset long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset(); if (endPhyOffset < offset) { files = this.indexFileList.toArray(); } } catch (Exception e) { log.error("destroy exception", e); } finally { this.readWriteLock.readLock().unlock(); } if (files != null) { // 找到需要刪除的indexFile List<IndexFile> fileList = new ArrayList<IndexFile>(); for (int i = 0; i < (files.length - 1); i++) { IndexFile f = (IndexFile) files[i]; if (f.getEndPhyOffset() < offset) { fileList.add(f); } else { break; } } // 刪除 this.deleteExpiredFile(fileList); } }
可以看到,先是拿第一個indexFile
看看有沒有小於commitlog
最小offset
的情況發生,這裡也是拿的indexFile
最後一個offset
做的對比,因為這塊也是按照offset
大小 前後順序處理的,最後一個的offest
肯定是這個indexFile
中最大的了,如果第一個indexFile
滿足了的話,就會拿到所有參照,然後遍歷找出符合條件的indexFile
, 呼叫deleteExpiredFile
方法遍歷銷燬:
private void deleteExpiredFile(List<IndexFile> files) { if (!files.isEmpty()) { try { this.readWriteLock.writeLock().lock(); for (IndexFile file : files) { // 銷燬 boolean destroyed = file.destroy(3000); // 從index 集合中移除 destroyed = destroyed && this.indexFileList.remove(file); if (!destroyed) { log.error("deleteExpiredFile remove failed."); break; } } } catch (Exception e) { log.error("deleteExpiredFile has exception.", e); } finally { this.readWriteLock.writeLock().unlock(); } } }
這裡就是遍歷銷燬,然後移除對這個indexFile
管理。
本文主要是介紹了RocketMQ broker 訊息清理機制
,介紹了主要清理哪些檔案 :commitlog ,ConsumeQueue,indexFile
接著就是介紹了什麼時候觸發清理,比如說凌晨4點 ,磁碟沒滿85% 以上的話,就是清理72小時之前的,如果是滿了85%就除了還在用著的那個先清10個看看, 還有就是磁碟使用空間75% 以上也是會觸發的, 低於85 % 清理72小時之前的,高於85% 先清理10個檔案看看,這是commitlog的清理機制,關於ConsumeQueue與indexFile的話,就是與commitlog中最小的那個offset 有關了,小於commitlog中最小offset 的那些還是要清理掉的。 最後就是分別解析了一下commitlog 檔案清理,ConsumeQueue 檔案清理與indexFile 檔案清理。
參考
以上就是RocketMQ broker檔案清理原始碼解析的詳細內容,更多關於RocketMQ broker檔案清理的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45