首頁 > 軟體

Netty分散式ByteBuf使用的回收邏輯剖析

2022-03-28 19:00:04

前文傳送門:ByteBuf使用subPage級別記憶體分配

ByteBuf回收

之前的章節我們提到過, 堆外記憶體是不受jvm垃圾回收機制控制的, 所以我們分配一塊堆外記憶體進行ByteBuf操作時, 使用完畢要對物件進行回收, 這一小節, 就以PooledUnsafeDirectByteBuf為例講解有關記憶體分配的相關邏輯

PooledUnsafeDirectByteBuf中記憶體釋放的入口方法是其父類別AbstractReferenceCountedByteBuf中的release方法:

@Override
 public boolean release() {
     return release0(1);
 }

這裡呼叫了release0, 跟進去

private boolean release0(int decrement) {
    for (;;) {
        int refCnt = this.refCnt;
        if (refCnt < decrement) {
            throw new IllegalReferenceCountException(refCnt, -decrement);
        }
        if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) { 
            if (refCnt == decrement) { 
                deallocate();
                return true;
            }
            return false;
        }
    }
}

 if (refCnt == decrement) 中判斷當前byteBuf是否沒有被參照了, 如果沒有被參照, 則通過deallocate()方法進行釋放

因為我們是以PooledUnsafeDirectByteBuf為例, 所以這裡會呼叫其父類別PooledByteBuf的deallocate方法:

protected final void deallocate() {
    if (handle >= 0) {
        final long handle = this.handle;
        this.handle = -1;
        memory = null;
        chunk.arena.free(chunk, handle, maxLength, cache);
        recycle();
    }
}

this.handle = -1表示當前的ByteBuf不再指向任何一塊記憶體

memory = null這裡將memory也設定為null

chunk.arena.free(chunk, handle, maxLength, cache)這一步是將ByteBuf的記憶體進行釋放

recycle()是將物件放入的物件回收站, 迴圈利用

我們首先分析free方法

void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
    //是否為unpooled
    if (chunk.unpooled) {
        int size = chunk.chunkSize();
        destroyChunk(chunk);
        activeBytesHuge.add(-size);
        deallocationsHuge.increment();
    } else {
        //那種級別的Size
        SizeClass sizeClass = sizeClass(normCapacity);
        //加到快取裡
        if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
            return;
        }
        //將快取物件標記為未使用
        freeChunk(chunk, handle, sizeClass);
    }
}

首先判斷是不是unpooled, 我們這裡是Pooled, 所以會走到else塊中:

sizeClass(normCapacity)計算是哪種級別的size, 我們按照tiny級別進行分析

cache.add(this, chunk, handle, normCapacity, sizeClass)是將當前當前ByteBuf進行快取

我們之前講過, 再分配ByteBuf時首先在快取上分配, 而這步, 就是將其快取的過程, 跟進去:

boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
    //拿到MemoryRegionCache節點
    MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
    if (cache == null) {
        return false;
    }
    //將chunk, 和handle封裝成實體加到queue裡面
    return cache.add(chunk, handle);
}

首先根據根據型別拿到相關型別快取節點, 這裡會根據不同的記憶體規格去找不同的物件, 我們簡單回顧一下, 每個快取物件都包含一個queue, queue中每個節點是entry, 每一個entry中包含一個chunk和handle, 可以指向唯一的連續的記憶體

我們跟到cache中

private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
    switch (sizeClass) {
    case Normal:
        return cacheForNormal(area, normCapacity);
    case Small:
        return cacheForSmall(area, normCapacity);
    case Tiny:
        return cacheForTiny(area, normCapacity);
    default:
        throw new Error();
    }
}

假設我們是tiny型別, 這裡就會走到cacheForTiny(area, normCapacity)方法中, 跟進去:

private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) { 
    int idx = PoolArena.tinyIdx(normCapacity);
    if (area.isDirect()) {
        return cache(tinySubPageDirectCaches, idx);
    }
    return cache(tinySubPageHeapCaches, idx);
}

這個方法我們之前剖析過, 就是根據大小找到第幾個快取中的第幾個快取, 拿到下標之後, 通過cache去超相對應的快取物件:  

private static <T>  MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
    if (cache == null || idx > cache.length - 1) {
        return null;
    }
    return cache[idx];
}

我們這裡看到, 是直接通過下標拿的快取物件

回到add方法中

boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
    //拿到MemoryRegionCache節點
    MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
    if (cache == null) {
        return false;
    }
    //將chunk, 和handle封裝成實體加到queue裡面
    return cache.add(chunk, handle);
}

這裡的cache物件呼叫了一個add方法, 這個方法就是將chunk和handle封裝成一個entry加到queue裡面

我們跟到add方法中:

public final boolean add(PoolChunk<T> chunk, long handle) {
    Entry<T> entry = newEntry(chunk, handle); 
    boolean queued = queue.offer(entry);
    if (!queued) {
        entry.recycle();
    }
    return queued;
}

我們之前介紹過, 從在快取中分配的時候從queue彈出一個entry, 會放到一個物件池裡面, 而這裡Entry<T> entry = newEntry(chunk, handle)就是從物件池裡去取一個entry物件, 然後將chunk和handle進行賦值

然後通過queue.offer(entry)加到queue中

我們回到free方法中

void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
    //是否為unpooled
    if (chunk.unpooled) {
        int size = chunk.chunkSize();
        destroyChunk(chunk);
        activeBytesHuge.add(-size);
        deallocationsHuge.increment();
    } else {
        //那種級別的Size
        SizeClass sizeClass = sizeClass(normCapacity);
        //加到快取裡
        if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
            return;
        } 
        freeChunk(chunk, handle, sizeClass);
    }
}

這裡加到快取之後, 如果成功, 就會return, 如果不成功, 就會呼叫freeChunk(chunk, handle, sizeClass)方法, 這個方法的意義是, 將原先給ByteBuf分配的記憶體區段標記為未使用

跟進freeChunk簡單分析下:

void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {
    final boolean destroyChunk;
    synchronized (this) {
        switch (sizeClass) {
        case Normal:
            ++deallocationsNormal;
            break;
        case Small:
            ++deallocationsSmall;
            break;
        case Tiny:
            ++deallocationsTiny;
            break;
        default:
            throw new Error();
        }
        destroyChunk = !chunk.parent.free(chunk, handle);
    }
    if (destroyChunk) {
        destroyChunk(chunk);
    }
}

我們再跟到free方法中:

boolean free(PoolChunk<T> chunk, long handle) {
    chunk.free(handle);
    if (chunk.usage() < minUsage) {
        remove(chunk);
        return move0(chunk);
    }
    return true;
}

chunk.free(handle)的意思是通過chunk釋放一段連續的記憶體

再跟到free方法中:

void free(long handle) {
    int memoryMapIdx = memoryMapIdx(handle);
    int bitmapIdx = bitmapIdx(handle);

    if (bitmapIdx != 0) { 
        PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
        assert subpage != null && subpage.doNotDestroy;
        PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
        synchronized (head) {
            if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {
                return;
            }
        }
    }
    freeBytes += runLength(memoryMapIdx);
    setValue(memoryMapIdx, depth(memoryMapIdx));
    updateParentsFree(memoryMapIdx);
}

 if (bitmapIdx != 0)這 裡判斷是當前緩衝區分配的級別是Page還是Subpage, 如果是Subpage, 則會找到相關的Subpage將其點陣圖示記為0

如果不是subpage, 這裡通過分配記憶體的反向標記, 將該記憶體標記為未使用

這段邏輯可以讀者自行分析, 如果之前分配相關的知識掌握紮實的話, 這裡的邏輯也不是很難

回到PooledByteBuf的deallocate方法中:

protected final void deallocate() {
    if (handle >= 0) {
        final long handle = this.handle;
        this.handle = -1;
        memory = null;
        chunk.arena.free(chunk, handle, maxLength, cache);
        recycle();
    }
}

最後, 通過recycle()將釋放的ByteBuf放入物件回收站, 有關物件回收站的知識, 會在以後的章節進行剖析

以上就是記憶體回收的大概邏輯,更多關於Netty分散式ByteBuf使用回收的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com