首頁 > 軟體

Netty分散式ByteBuf中PooledByteBufAllocator剖析

2022-03-28 16:00:26

前言

上一小節簡單介紹了ByteBufAllocator以及其子類UnPooledByteBufAllocator的緩衝區分類的邏輯, 這一小節開始帶大家剖析更為複雜的PooledByteBufAllocator, 我們知道PooledByteBufAllocator是通過自己取一塊連續的記憶體進行ByteBuf的封裝, 所以這裡更為複雜, 在這一小節簡單講解有關PooledByteBufAllocator分配邏輯

友情提示:  從這一節開始難度開始加大, 請各位戰友做好心理準備

PooledByteBufAllocator分配邏輯

PooledByteBufAllocator同樣也重寫了AbstractByteBuf的newDirectBuffer和newHeapBuffer兩個抽象方法, 我們這一小節以newDirectBuffer為例, 先簡述一下其邏輯

邏輯簡述

首先看UnPooledByteBufAllocator中newDirectBuffer這個方法

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;
    ByteBuf buf;
    if (directArena != null) { 
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        if (PlatformDependent.hasUnsafe()) {
            buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
        } else {
            buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }
    }
    return toLeakAwareBuffer(buf);
}

首先 PoolThreadCache cache = threadCache.get() 這一步是拿到一個執行緒區域性快取物件, 執行緒區域性快取, 顧明思議, 就是同一個執行緒共用的一個快取

threadCache是PooledByteBufAllocator類的一個成員變數, 型別是PoolThreadLocalCache(這兩個非常容易混淆, 切記):

private final PoolThreadLocalCache threadCache;

再看其型別PoolThreadLocalCache的定義:

final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
    @Override
    protected synchronized PoolThreadCache initialValue() {
        final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
        final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
        return new PoolThreadCache(
                heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, 
                DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
    }
    //程式碼省略
}

這裡繼承了一個FastThreadLocal類, 這個類相當於jdk的ThreadLocal, 只是效能更快, 有關FastThreadLocal, 我們在後面的章節會詳細剖析, 這裡我們只要知道, 繼承FastThreadLocal類並且重寫了initialValue方法, 則通過其get方法就能獲得initialValue返回的物件, 並且這個物件是執行緒共用的

在這裡我們看到, 在重寫的initialValue方法中, 初始化了heapArena和directArena兩個屬性之後, 通過new PoolThreadCache()這種方式建立了PoolThreadCache物件

這裡注意, PoolThreadLocalCache是一個FastThreadLocal, 而PoolThreadCache才是執行緒區域性快取, 這兩個類名非常非常像, 千萬別搞混了(我當初讀這段程式碼時因為搞混所以懵逼了)

其中heapArena和directArena是分別是用來分配堆和堆外記憶體用的兩個物件, 以directArena為例, 我們看到是通過leastUsedArena(directArenas)這種方式獲得的, directArenas是一個directArena型別的陣列, leastUsedArena(directArenas)這個方法是用來獲取陣列中一個使用最少的directArena物件

directArenas是PooledByteBufAllocator的成員變數, 是在其構造方法中初始化的:

public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, 
                              int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
    //程式碼省略
    if (nDirectArena > 0) {
        directArenas = newArenaArray(nDirectArena);
        List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
        for (int i = 0; i < directArenas.length; i ++) {
            PoolArena.DirectArena arena = new PoolArena.DirectArena(
                    this, pageSize, maxOrder, pageShifts, chunkSize);
            directArenas[i] = arena;
            metrics.add(arena);
        }
        directArenaMetrics = Collections.unmodifiableList(metrics);
    } else {
        directArenas = null;
        directArenaMetrics = Collections.emptyList();
    }
}

我們看到這裡通過directArenas = newArenaArray(nDirectArena)初始化了directArenas, 其中nDirectArena, 預設是cpu核心數的2倍, 這點我們可以跟蹤構造方法的呼叫鏈可以分析到

這樣保證了每一個執行緒會有一個獨享的arena

我們看newArenaArray(nDirectArena)這個方法:

private static <T> PoolArena<T>[] newArenaArray(int size) {
    return new PoolArena[size];
}

這裡只是建立了一個陣列, 預設長度為nDirectArena

繼續跟PooledByteBufAllocator的構造方法, 建立完了陣列, 後面在for迴圈中為陣列賦值:

首先通過new PoolArena.DirectArena建立一個DirectArena範例, 然後再為新建立的directArenas陣列賦值

再回到PoolThreadLocalCache的構造方法中:

final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
    @Override
    protected synchronized PoolThreadCache initialValue() {
        final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
        final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
        return new PoolThreadCache(
                heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, 
                DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
    }
    //程式碼省略
}

方法最後, 建立PoolThreadCache的一個物件, 我們跟進構造方法中:

PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, 
                int tinyCacheSize, int smallCacheSize, int normalCacheSize, 
                int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
    //程式碼省略
    //儲存成兩個成員變數
    this.heapArena = heapArena;
    this.directArena = directArena;
    //程式碼省略
}

這裡省略了大段程式碼, 只需要關注這裡將兩個值儲存在PoolThreadCache的成員變數中

我們回到newDirectBuffer中

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;
    ByteBuf buf;
    if (directArena != null) { 
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        if (PlatformDependent.hasUnsafe()) {
            buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
        } else {
            buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }
    }
    return toLeakAwareBuffer(buf);
}

簡單分析的執行緒區域性快取初始化相關邏輯, 我們再往下跟:

PoolArena<ByteBuffer> directArena = cache.directArena;

通過上面的分析, 這步我們應該不陌生, 在PoolThreadCache構造方法中將directArena和heapArena中儲存在成員變數中, 這樣就可以直接通過cache.directArena這種方式拿到其成員變數的內容

從以上邏輯, 我們可以大概的分析一下流程, 通常會建立和執行緒數量相等的arena, 並以陣列的形式儲存在PooledByteBufAllocator的成員變數中, 每一個PoolThreadCache建立的時候, 都會在當前執行緒拿到一個arena, 並儲存在自身的成員變數中

PoolThreadCache除了維護了一個arena之外, 還維護了一個快取列表, 我們在重複分配ByteBuf的時候, 並不需要每次都通過arena進行分配, 可以直接從快取列表中拿一個ByteBuf

有關快取列表, 我們循序漸進的往下看

在PooledByteBufAllocator中維護了三個值:

1.  tinyCacheSize

2.  smallCacheSize

3.  normalCacheSize

tinyCacheSize代表tiny型別的ByteBuf能快取多少個

smallCacheSize代表small型別的ByteBuf能快取多少個

normalCacheSize代表normal型別的ByteBuf能快取多少個

具體tiny型別, small型別, normal是什麼意思, 我們會在後面講解

我們回到PoolThreadLocalCache類中看其構造方法:

final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
    @Override
    protected synchronized PoolThreadCache initialValue() {
        final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
        final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
        return new PoolThreadCache(
                heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, 
                DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
    }
    //程式碼省略
}

我們看到這三個屬性是在PoolThreadCache的構造方法中傳入的

這三個屬性是通過PooledByteBufAllocator的構造方法中初始化的, 跟隨構造方法的呼叫鏈會走到這個構造方法:

public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
    this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, 
            DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE);
}

這裡仍然呼叫了一個過載的構造方法, 這裡我們關注這幾個引數:

DEFAULT_TINY_CACHE_SIZE,

DEFAULT_SMALL_CACHE_SIZE,

DEFAULT_NORMAL_CACHE_SIZE

這裡對應著幾個靜態的成員變數:

private static final int DEFAULT_TINY_CACHE_SIZE;
private static final int DEFAULT_SMALL_CACHE_SIZE;
private static final int DEFAULT_NORMAL_CACHE_SIZE;

我們在static塊中看其初始化過程

static{
    //程式碼省略
    DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
    DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
    DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
    //程式碼省略
}

在這裡我們看到, 這三個屬性分別初始化的大小是512, 256, 64, 這三個屬性就對應了PooledByteBufAllocator另外的幾個成員變數, tinyCacheSize, smallCacheSize, normalCacheSize

也就是說, tiny型別的ByteBuf在每個快取中預設快取的數量是512個, small型別的ByteBuf在每個快取中預設快取的數量是256個, normal型別的ByteBuf在每個快取中預設快取的數量是64個

我們再到PooledByteBufAllocator中過載的構造方法中:

public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, 
                              int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
    super(preferDirect);
    threadCache = new PoolThreadLocalCache();
    this.tinyCacheSize = tinyCacheSize;
    this.smallCacheSize = smallCacheSize;
    this.normalCacheSize = normalCacheSize;
    //程式碼省略
}

篇幅原因, 這裡也省略了大段程式碼, 大家可以通過構造方法引數找到原始碼中相對的位置進行閱讀

我們關注這段程式碼:

this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;

在這裡將將引數的

DEFAULT_TINY_CACHE_SIZE,

DEFAULT_SMALL_CACHE_SIZE,

DEFAULT_NORMAL_CACHE_SIZE

的三個值儲存到了成員變數

tinyCacheSize,

smallCacheSize,

normalCacheSize

PooledByteBufAllocator中將這三個成員變數初始化之後, 在PoolThreadLocalCache的initialValue方法中就可以使用這三個成員變數的值了

我們再次跟到initialValue方法中

final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
    @Override
    protected synchronized PoolThreadCache initialValue() {
        final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
        final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
        return new PoolThreadCache(
                heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, 
                DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
    }
    //程式碼省略
}

這裡就可以在建立PoolThreadCache物件的的構造方法中傳入tinyCacheSize, smallCacheSize, normalCacheSize這三個成員變數了

我們再跟到PoolThreadCache的構造方法中:

PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, 
                int tinyCacheSize, int smallCacheSize, int normalCacheSize, 
                int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
    //程式碼省略
    this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
    this.heapArena = heapArena;
    this.directArena = directArena;
    if (directArena != null) {
        tinySubPageDirectCaches = createSubPageCaches(
                tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
        smallSubPageDirectCaches = createSubPageCaches(
                smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);

        numShiftsNormalDirect = log2(directArena.pageSize);
        normalDirectCaches = createNormalCaches(
                normalCacheSize, maxCachedBufferCapacity, directArena);

        directArena.numThreadCaches.getAndIncrement();
    } else {
        //程式碼省略
    }
    //程式碼省略
    ThreadDeathWatcher.watch(thread, freeTask);
}

其中tinySubPageDirectCaches, smallSubPageDirectCaches, 和normalDirectCaches就代表了三種型別的快取陣列, 陣列元素是MemoryRegionCache型別的物件, MemoryRegionCache就代表一個ByeBuf的快取

以tinySubPageDirectCaches為例, 我們看到tiny型別的快取是通過createSubPageCaches這個方法建立的

這裡傳入了三個引數tinyCacheSize我們之前分析過是512, PoolArena.numTinySubpagePools這裡是32(這裡不同型別的快取大小不一樣, small型別是4, normal型別是3) , SizeClass.Tiny代表其型別是tiny型別

我們跟到createSubPageCaches這個方法中

private static <T> MemoryRegionCache<T>[] createSubPageCaches(
        int cacheSize, int numCaches, SizeClass sizeClass) {
    if (cacheSize > 0) {
        //建立陣列, 長度為32
        @SuppressWarnings("unchecked")
        MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
        for (int i = 0; i < cache.length; i++) {
            //每一個節點是ubPageMemoryRegionCache物件
            cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
        }
        return cache;
    } else {
        return null;
    }
}

這裡首先建立了MemoryRegionCache, 長度是我們剛才分析過的32

然後通過for迴圈, 為陣列賦值, 賦值的物件是SubPageMemoryRegionCache型別的, SubPageMemoryRegionCache就是MemoryRegionCache型別的子類, 同樣也是一個快取物件, 構造方法中, cacheSize, 就是其中快取物件的數量, 如果是tiny型別就是512, sizeClass, 代表其型別, 比如tiny, small或者normal

再簡單跟到其構造方法:

SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
    super(size, sizeClass);
}

這裡呼叫了父類別的構造方法, 我們繼續跟進去:

MemoryRegionCache(int size, SizeClass sizeClass) {
     //size會進行規格化
     this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
     //佇列大小
     queue = PlatformDependent.newFixedMpscQueue(this.size);
     this.sizeClass = sizeClass;
 }

首先會對其進行規格化, 其實就是查詢大於等於當前size的2的冪次方的數, 這裡如果是512那麼規格化之後還是512, 然後初始化一個佇列, 佇列大小就是傳入的大小, 如果是tiny, 這裡大小就是512

最後並儲存其型別

這裡我們不難看出, 其實每個快取的物件, 裡面是通過一個佇列儲存的, 有關快取佇列和ByteBuf之間的邏輯, 後面的小節會進行剖析

從上面剖析我們不難看出, PoolThreadCache中維護了三種型別的快取陣列, 每個快取陣列中的每個值中, 又通過一個佇列進行物件的儲存

當然這裡只舉了Direct型別的物件關係, heap型別其實都是一樣的, 這裡不再贅述

這一小節邏輯較為複雜, 同學們可以自己在原始碼中跟蹤一遍加深印象

以上就是Netty分散式ByteBuf中PooledByteBufAllocator剖析的詳細內容,更多關於Netty分散式ByteBuf PooledByteBufAllocato的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com