首頁 > 軟體

Netty分散式ByteBuf使用directArena分配緩衝區過程解析

2022-03-28 16:00:07

上一小節簡單分析了PooledByteBufAllocator中, 執行緒區域性快取和arean的相關邏輯, 這一小節簡單分析下directArena分配緩衝區的相關過程

directArena分配緩衝區

回到newDirectBuffer中

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;
    ByteBuf buf;
    if (directArena != null) { 
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        if (PlatformDependent.hasUnsafe()) {
            buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
        } else {
            buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }
    }
    return toLeakAwareBuffer(buf);
}

獲取了directArena物件之後, 通過allocate方法分配一個ByteBuf, 這裡allocate方法是PoolArena類中的方法

跟到allocate方法中:

PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { 
    PooledByteBuf<T> buf = newByteBuf(maxCapacity); 
    allocate(cache, buf, reqCapacity);
    return buf;
}

首先通過newByteBuf獲得一個ByteBuf物件

再通過allocate方法進行分配, 這裡要注意, 這裡進行分配的時候是執行緒私有的directArena進行分配

我們跟到newByteBuf方法中

因為是directArena呼叫的newByteBuf, 所以這裡會進入DirectArena類的newByteBuf中:

protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) { 
    if (HAS_UNSAFE) { 
        return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
    } else {
        return PooledDirectByteBuf.newInstance(maxCapacity);
    }
}

因為預設通常是有unsafe物件的, 所以這裡會走到這一步中PooledUnsafeDirectByteBuf.newInstance(maxCapacity)

通過靜態方法newInstance建立一個PooledUnsafeDirectByteBuf物件

跟到newInstance方法中:

static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
    PooledUnsafeDirectByteBuf buf = RECYCLER.get();
    buf.reuse(maxCapacity);
    return buf;
}

這裡通過RECYCLER.get()這種方式拿到一個ByteBuf物件, RECYCLER其實是一個物件回收站, 這部分內容會在後面的內容中詳細剖析, 這裡我們只需要知道, 這種方式能從回收站中拿到一個物件, 如果回收站裡沒有相關物件, 則建立一個新

因為這裡有可能是從回收站中拿出的一個物件, 所以通過reuse進行復用

跟到reuse方法中

final void reuse(int maxCapacity) {
    maxCapacity(maxCapacity);
    setRefCnt(1);
    setIndex0(0, 0);
    discardMarks();
}

這裡設定了的最大可擴容記憶體, 物件的參照數量, 讀寫指標位置都重置為0, 以及讀寫指標的位置標記也都重置為0

我們回到PoolArena的allocate方法中:

PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { 
    PooledByteBuf<T> buf = newByteBuf(maxCapacity); 
    allocate(cache, buf, reqCapacity);
    return buf;
}

拿到了ByteBuf物件, 就可以通過allocate(cache, buf, reqCapacity)方法進行記憶體分配了

跟到allocate方法中

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    //規格化
    final int normCapacity = normalizeCapacity(reqCapacity);
    if (isTinyOrSmall(normCapacity)) { 
        int tableIdx;
        PoolSubpage<T>[] table;
        //判斷是不是tinty
        boolean tiny = isTiny(normCapacity);
        if (tiny) { // < 512
            //快取分配
            if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                return;
            }
            //通過tinyIdx拿到tableIdx
            tableIdx = tinyIdx(normCapacity);
            //subpage的陣列
            table = tinySubpagePools;
        } else {
            if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                return;
            }
            tableIdx = smallIdx(normCapacity);
            table = smallSubpagePools;
        }
        //拿到對應的節點
        final PoolSubpage<T> head = table[tableIdx];

        synchronized (head) {
            final PoolSubpage<T> s = head.next;
            //預設情況下, head的next也是自身
            if (s != head) {
                assert s.doNotDestroy && s.elemSize == normCapacity;
                long handle = s.allocate();
                assert handle >= 0;
                s.chunk.initBufWithSubpage(buf, handle, reqCapacity);

                if (tiny) {
                    allocationsTiny.increment();
                } else {
                    allocationsSmall.increment();
                }
                return;
            }
        }
        allocateNormal(buf, reqCapacity, normCapacity);
        return;
    }
    if (normCapacity <= chunkSize) {
        //首先在快取上進行記憶體分配
        if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
            //分配成功, 返回
            return;
        }
        //分配不成功, 做實際的記憶體分配
        allocateNormal(buf, reqCapacity, normCapacity);
    } else {
        //大於這個值, 就不在快取上分配
        allocateHuge(buf, reqCapacity);
    }
}

這裡看起來邏輯比較長, 其實主要步驟分為兩步

1.首先在快取上進行分配

對應步驟是:

  cache.allocateTiny(this, buf, reqCapacity, normCapacity)

  cache.allocateSmall(this, buf, reqCapacity, normCapacity)

  cache.allocateNormal(this, buf, reqCapacity, normCapacity)

2.如果在快取上分配不成功, 則實際分配一塊記憶體

對應步驟是

  allocateNormal(buf, reqCapacity, normCapacity)

在這裡對幾種型別的記憶體進行介紹:

之前的小節我們介紹過, 緩衝區記憶體型別分為tiny, small, 和normal, 其實還有種不常見的型別叫做huge, 那麼這幾種型別的記憶體有什麼區別呢, 實際上這幾種型別是按照緩衝區初始化空間的範圍進行區分的, 具體區分如下:

tiny型別對應的緩衝區範圍為0-512B

small型別對應的緩衝區範圍為512B-8K

normal型別對應的緩衝區範圍為8K-16MB

huge型別對應緩衝區範圍為大於16MB

簡單介紹下有關範圍的含義:

16MB對應一個chunk, netty是以chunk為單位向作業系統申請記憶體的

8k對應一個page, page是將chunk切分後的結果, 一個chunk對應2048個page

8k以下對應一個subpage, subpage是page的切分, 一個page可以切分多個subpage, 具體切分幾個需要根據subpage的大小而定, 比如只要分配1k的緩衝區, 則會將page切分成8個subpage

以上就是directArena記憶體分配的大概流程和相關概念,更多關於Netty分散式ByteBuf directArena分配緩衝區的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com