首頁 > 軟體

Netty分散式ByteBuf使用page級別的記憶體分配解析

2022-03-28 19:00:40

前面小節我們剖析過命中快取的記憶體分配邏輯, 前提是如果快取中有資料, 那麼快取中沒有資料, netty是如何開闢一塊記憶體進行記憶體分配的呢?這一小節帶大家進行剖析:

netty記憶體分配資料結構

之前我們介紹過, netty記憶體分配的單位是chunk, 一個chunk的大小是16MB, 實際上每個chunk, 都以雙向連結串列的形式儲存在一個chunkList中, 而多個chunkList, 同樣也是雙向連結串列進行關聯的, 大概結構如下所示:

在chunkList中, 是根據chunk的記憶體使用率歸到一個chunkList中, 這樣, 在記憶體分配時, 會根據百分比找到相應的chunkList, 在chunkList中選擇一個chunk進行記憶體分配 

我們看PoolArena中有關chunkList的成員變數

private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;

這裡總共定義了6個chunkList, 並在構造方法將其進行初始化

跟到其構造方法中:

protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
    //程式碼省略
    q100 = new PoolChunkList<T>(null, 100, Integer.MAX_VALUE, chunkSize);
    q075 = new PoolChunkList<T>(q100, 75, 100, chunkSize); 
    q050 = new PoolChunkList<T>(q075, 50, 100, chunkSize);
    q025 = new PoolChunkList<T>(q050, 25, 75, chunkSize);
    q000 = new PoolChunkList<T>(q025, 1, 50, chunkSize);
    qInit = new PoolChunkList<T>(q000, Integer.MIN_VALUE, 25, chunkSize);
    //用雙向連結串列的方式進行連線
    q100.prevList(q075);
    q075.prevList(q050);
    q050.prevList(q025);
    q025.prevList(q000);
    q000.prevList(null);
    qInit.prevList(qInit);
    //程式碼省略
}

首先通過new PoolChunkList()這種方式將每個chunkList進行建立, 我們以 q050 = new PoolChunkList<T>(q075, 50, 100, chunkSize) 為例進行簡單的介紹

q075表示當前q50的下一個節點是q075, 剛才我們講過ChunkList是通過雙向連結串列進行關聯的, 所以這裡不難理解

引數50和100表示當前chunkList中儲存的chunk的記憶體使用率都在50%到100%之間, 最後chunkSize為其設定大小

建立完ChunkList之後, 再設定其上一個節點, q050.prevList(q025)為例, 這裡代表當前chunkList的上一個節點是q025

以這種方式建立完成之後, chunkList的節點關係變成了如下圖所示:

netty中, chunk又包含了多個page, 每個page的大小為8k, 如果要分配16k的記憶體, 則在在chunk中找到連續的兩個page就可以分配, 對應關係如下:

很多場景下, 為緩衝區分配8k的記憶體也是一種浪費, 比如只需要分配2k的緩衝區, 如果使用8k會造成6k的浪費, 這種情況, netty又會將page切分成多個subpage, 每個subpage大小要根據分配的緩衝區大小而指定, 比如要分配2k的記憶體, 就會將一個page切分成4個subpage, 每個subpage的大小為2k, 如圖:

我們看PoolSubpage的屬性

final PoolChunk<T> chunk;
private final int memoryMapIdx;
private final int runOffset;
private final int pageSize; 
private final long[] bitmap;
PoolSubpage<T> prev;
PoolSubpage<T> next;
boolean doNotDestroy; 
int elemSize;

chunk代表其子頁屬於哪個chunk

bitmap用於記錄子頁的記憶體分配情況

prev和next, 代表子頁是按照雙向連結串列進行關聯的, 這裡分別指向上一個和下一個節點

elemSize屬性, 代表的就是這個子頁是按照多大記憶體進行劃分的, 如果按照1k劃分, 則可以劃分出8個子頁

簡單介紹了記憶體分配的資料結構, 我們開始剖析netty在page級別上分配記憶體的流程:

我們回到PoolArena的allocate方法

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    //規格化
    final int normCapacity = normalizeCapacity(reqCapacity);
    if (isTinyOrSmall(normCapacity)) { 
        int tableIdx;
        PoolSubpage<T>[] table;
        //判斷是不是tinty
        boolean tiny = isTiny(normCapacity);
        if (tiny) { // < 512
            //快取分配
            if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                return;
            }
            //通過tinyIdx拿到tableIdx
            tableIdx = tinyIdx(normCapacity);
            //subpage的陣列
            table = tinySubpagePools;
        } else {
            if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                return;
            }
            tableIdx = smallIdx(normCapacity);
            table = smallSubpagePools;
        }

        //拿到對應的節點
        final PoolSubpage<T> head = table[tableIdx];

        synchronized (head) {
            final PoolSubpage<T> s = head.next;
            //預設情況下, head的next也是自身
            if (s != head) {
                assert s.doNotDestroy && s.elemSize == normCapacity;
                long handle = s.allocate();
                assert handle >= 0;
                s.chunk.initBufWithSubpage(buf, handle, reqCapacity);

                if (tiny) {
                    allocationsTiny.increment();
                } else {
                    allocationsSmall.increment();
                }
                return;
            }
        }
        allocateNormal(buf, reqCapacity, normCapacity);
        return;
    }
    if (normCapacity <= chunkSize) {
        //首先在快取上進行記憶體分配
        if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
            //分配成功, 返回
            return;
        }
        //分配不成功, 做實際的記憶體分配
        allocateNormal(buf, reqCapacity, normCapacity);
    } else {
        //大於這個值, 就不在快取上分配
        allocateHuge(buf, reqCapacity);
    }
}

我們之前講過, 如果在快取中分配不成功, 則會開闢一塊連續的記憶體進行緩衝區分配, 這裡我們先跳過isTinyOrSmall(normCapacity)往後的程式碼, 下一小節進行分析

首先 if (normCapacity <= chunkSize) 說明其小於16MB, 然後首先在快取中分配, 因為最初快取中沒有值, 所以會走到allocateNormal(buf, reqCapacity, normCapacity), 這裡實際上就是在page級別上進行分配, 分配一個或者多個page的空間

我們跟進allocateNormal

private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
    //首先在原來的chunk上進行記憶體分配(1)
    if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
        q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
        q075.allocate(buf, reqCapacity, normCapacity)) {
        ++allocationsNormal;
        return;
    }
    //建立chunk進行記憶體分配(2)
    PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
    long handle = c.allocate(normCapacity);
    ++allocationsNormal;
    assert handle > 0;
    //初始化byteBuf(3)
    c.initBuf(buf, handle, reqCapacity); 
    qInit.add(c);
}

這裡主要拆解了如下步驟

1. 在原有的chunk中進行分配

2. 建立chunk進行分配

3. 初始化ByteBuf

首先我們看第一步, 在原有的chunk中進行分配:

if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
    q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
    q075.allocate(buf, reqCapacity, normCapacity)) {
    ++allocationsNormal;
    return;
}

我們之前講過, chunkList是儲存不同記憶體使用量的chunk集合, 每個chunkList通過雙向連結串列的形式進行關聯, 這裡的q050.allocate(buf, reqCapacity, normCapacity)就代表首先在q050這個chunkList上進行記憶體分配

我們以q050為例進行分析, 跟到q050.allocate(buf, reqCapacity, normCapacity)方法中:

boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
    if (head == null || normCapacity > maxCapacity) {
        return false;
    }
    //從head節點往下遍歷
    for (PoolChunk<T> cur = head;;) {
        long handle = cur.allocate(normCapacity);
        if (handle < 0) {
            cur = cur.next;
            if (cur == null) {
                return false;
            }
        } else {
            cur.initBuf(buf, handle, reqCapacity);
            if (cur.usage() >= maxUsage) {
                remove(cur);
                nextList.add(cur);
            }
            return true;
        }
    }
}

首先會從head節點往下遍歷

 long handle = cur.allocate(normCapacity) 

表示對於每個chunk, 都嘗試去分配

 if (handle < 0) 說明沒有分配到, 則通過cur = cur.next找到下一個節點繼續進行分配, 我們講過chunk也是通過雙向連結串列進行關聯的, 所以對這塊邏輯應該不會陌生

如果handle大於0說明已經分配到了記憶體, 則通過cur.initBuf(buf, handle, reqCapacity)對byteBuf進行初始化

 if (cur.usage() >= maxUsage) 代表當前chunk的記憶體使用率大於其最大使用率, 則通過remove(cur)從當前的chunkList中移除, 再通過nextList.add(cur)新增到下一個chunkList中

我們再回到PoolArena的allocateNormal方法中:

我們看第二步

PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize)

這裡的引數pageSize是8192, 也就是8k

maxOrder為11

pageShifts為13,   2的13次方正好是8192, 也就是8k

chunkSize為16777216, 也就是16MB

這裡的引數值可以通過debug的方式跟蹤到

因為我們的範例是堆外記憶體, newChunk(pageSize, maxOrder, pageShifts, chunkSize)所以會走到DirectArena的newChunk方法中:

protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
    return new PoolChunk<ByteBuffer>( 
            this, allocateDirect(chunkSize), 
            pageSize, maxOrder, pageShifts, chunkSize);
}

這裡直接通過建構函式建立了一個chunk

allocateDirect(chunkSize)這裡是通過jdk的api的申請了一塊直接記憶體, 我們跟到PoolChunk的建構函式中:

PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
    unpooled = false;
    this.arena = arena;
    //memeory為一個ByteBuf
    this.memory = memory;
    //8k
    this.pageSize = pageSize;
    //13
    this.pageShifts = pageShifts;
    //11
    this.maxOrder = maxOrder;
    this.chunkSize = chunkSize;
    unusable = (byte) (maxOrder + 1);
    log2ChunkSize = log2(chunkSize);
    subpageOverflowMask = ~(pageSize - 1);
    freeBytes = chunkSize;
    assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;
    maxSubpageAllocs = 1 << maxOrder;
    //節點數量為4096
    memoryMap = new byte[maxSubpageAllocs << 1];
    //也是4096個節點
    depthMap = new byte[memoryMap.length];
    int memoryMapIndex = 1;
    //d相當於一個深度, 賦值的內容代表當前節點的深度
    for (int d = 0; d <= maxOrder; ++ d) {
        int depth = 1 << d;
        for (int p = 0; p < depth; ++ p) {
            memoryMap[memoryMapIndex] = (byte) d;
            depthMap[memoryMapIndex] = (byte) d;
            memoryMapIndex ++;
        }
    }
    subpages = newSubpageArray(maxSubpageAllocs);
}

首先將引數傳入的值進行賦值

 this.memory = memory 就是將引數中建立的堆外記憶體進行儲存, 就是chunk所指向的那塊連續的記憶體, 在這個chunk中所分配的ByteBuf, 都會在這塊記憶體中進行讀寫

我們重點關注 memoryMap = new byte[maxSubpageAllocs << 1] 

和 depthMap = new byte[memoryMap.length] 這兩步

首先看 memoryMap = new byte[maxSubpageAllocs << 1] 

這裡初始化了一個位元組陣列memoryMap, 大小為maxSubpageAllocs << 1, 也就是4096

 depthMap = new byte[memoryMap.length] 同樣也是初始化了一個位元組陣列, 大小為memoryMap的大小, 也就是4096

繼續往下分析之前, 我們看chunk的一個層級關係

這是一個二元樹的結構, 左側的數位代表層級, 右側代表一塊連續的記憶體, 每個父節點下又拆分成多個子節點, 最頂層表示的記憶體範圍為0-16MB, 其又下分為兩層, 範圍為0-8MB, 8-16MB, 以此類推, 最後到11層, 以8k的大小劃分, 也就是一個page的大小

如果我們分配一個8mb的緩衝區, 則會將第二層的第一個節點, 也就是0-8這個連續的記憶體進行分配, 分配完成之後, 會將這個節點設定為不可用, 具體邏輯後面會講解

結合上面的圖, 我們再看構造方法中的for迴圈:

for (int d = 0; d <= maxOrder; ++ d) {
    int depth = 1 << d;
    for (int p = 0; p < depth; ++ p) {
        memoryMap[memoryMapIndex] = (byte) d;
        depthMap[memoryMapIndex] = (byte) d;
        memoryMapIndex ++;
    }
}

實際上這個for迴圈就是將上面的結構包裝成一個位元組陣列memoryMap, 外層迴圈用於控制層數, 內層迴圈用於控制裡面每層的節點, 這裡經過迴圈之後, memoryMap和depthMap內容為以下表現形式:

[0, 0, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4...........]

這裡注意一下, 因為程式中陣列的下標是從1開始設定的, 所以第零個節點元素為預設值0

這裡數位代表層級, 同時也代表了當前層級的節點, 相同的數位個數就是這一層級的節點數

其中0為2個(因為這裡分配時下標是從1開始的, 所以第0個位置是預設值0, 實際上第零層元素只有一個, 就是頭結點), 1為2個, 2為4個, 3為8個, 4為16個, n為2的n次方個, 直到11, 也就是11有2的11次方個

我們再回到PoolArena的allocateNormal方法中

private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
    //首先在原來的chunk上進行記憶體分配(1)
    if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
        q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
        q075.allocate(buf, reqCapacity, normCapacity)) {
        ++allocationsNormal;
        return;
    }
    //建立chunk進行記憶體分配(2)
    PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
    long handle = c.allocate(normCapacity);
    ++allocationsNormal;
    assert handle > 0;
    //初始化byteBuf(3)
    c.initBuf(buf, handle, reqCapacity); 
    qInit.add(c);
}

我們繼續剖析 long handle = c.allocate(normCapacity) 這步

跟到allocate(normCapacity)中

long allocate(int normCapacity) {
    if ((normCapacity & subpageOverflowMask) != 0) { 
        return allocateRun(normCapacity);
    } else {
        return allocateSubpage(normCapacity);
    }
}

如果分配是以page為單位, 則走到allocateRun(normCapacity)方法中, 跟進去:

private long allocateRun(int normCapacity) {
    int d = maxOrder - (log2(normCapacity) - pageShifts); 
    int id = allocateNode(d);
    if (id < 0) {
        return id;
    }
    freeBytes -= runLength(id);
    return id;
}

 int d = maxOrder - (log2(normCapacity) - pageShifts) 表示根據normCapacity計算出圖5-8-5中的第幾層

 int id = allocateNode(d) 表示根據層級關係, 去分配一個節點, 其中id代表memoryMap中的下標

我們跟到allocateNode方法中

private int allocateNode(int d) {
    //下標初始值為1
    int id = 1;
    //代表當前層級第一個節點的初始下標
    int initial = - (1 << d);
    //獲取第一個節點的值
    byte val = value(id);
    //如果值大於層級, 說明chunk不可用
    if (val > d) { 
        return -1;
    }
    //當前下標對應的節點值如果小於層級, 或者當前下標小於層級的初始下標
    while (val < d || (id & initial) == 0) {
        //當前下標乘以2, 代表下當前節點的子節點的起始位置
        id <<= 1;
        //獲得id位置的值
        val = value(id);
        //如果當前節點值大於層數(節點不可用)
        if (val > d) {
            //id為偶數則+1, id為奇數則-1(拿的是其兄弟節點)
            id ^= 1;
            //獲取id的值
            val = value(id);
        }
    }
    byte value = value(id);
    assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d", 
            value, id & initial, d);
    //將找到的節點設定為不可用
    setValue(id, unusable); 
    //逐層往上標記被使用
    updateParentsAlloc(id);
    return id;
}

這裡是實際上是從第一個節點往下找, 找到層級為d未被使用的節點, 我們可以通過註釋體會其邏輯

找到相關節點後通過setValue將當前節點設定為不可用, 其中id是當前節點的下標, unusable代表一個不可用的值, 這裡是12, 因為我們的層級只有12層, 所以設定為12之後就相當於標記不可用

設定成不可用之後, 通過updateParentsAlloc(id)逐層設定為被使用

我們跟進updateParentsAlloc方法

private void updateParentsAlloc(int id) {
    while (id > 1) {
        //取到當前節點的父節點的id
        int parentId = id >>> 1;
        //獲取當前節點的值
        byte val1 = value(id);
        //找到當前節點的兄弟節點
        byte val2 = value(id ^ 1);
        //如果當前節點值小於兄弟節點, 則儲存當前節點值到val, 否則, 儲存兄弟節點值到val 
        //如果當前節點是不可用, 則當前節點值是12, 大於兄弟節點的值, 所以這裡將兄弟節點的值進行儲存
        byte val = val1 < val2 ? val1 : val2;
        //將val的值設定為父節點下標所對應的值
        setValue(parentId, val);
        //id設定為父節點id, 繼續迴圈
        id = parentId;
    }
}

這裡其實是將回圈將兄弟節點的值替換成父節點的值, 我們可以通過註釋仔細的進行邏輯分析

如果實在理解有困難, 我通過畫圖幫助大家理解:

簡單起見, 我們這裡只設定三層:

這裡我們模擬其分配場景, 假設只有三層, 其中index代表陣列memoryMap的下標, value代表其值, memoryMap中的值就為[0, 0, 1, 1, 2, 2, 2, 2]

我們要分配一個4MB的byteBuf, 在我們呼叫allocateNode(int d)中傳入的d是2, 也就是第二層

根據我們上面分分析的邏輯這裡會找到第二層的第一個節點, 也就是0-4mb這個節點, 找到之後將其設定為不可用, 這樣memoryMap中的值就為[0, 0, 1, 1, 12, 2, 2, 2]

二元樹的結構就會變為:

注意標紅部分, 將index為4的節點設定為了不可用

將這個節點設定為不可用之後, 則會將進行向上設定不可用, 迴圈將兄弟節點數值較小的節點替換到父節點, 也就是將index為2的節點的值替換成了index的為5的節點的值, 這樣陣列的值就會變為[0, 1, 2, 1, 12, 2, 2, 2]

二元樹的結構變為:

注意, 這裡節點標紅僅僅代表節點變化, 並不是當前節點為不可用狀態, 真正不可用狀態的判斷依據是value值為12

這樣, 如果再次分配一個4MB記憶體的ByteBuf, 根據其邏輯, 則會找到第二層的第二個節點, 也就是4-8MB

再根據我們的邏輯, 通過向上設定不可用, index為2就會設定成不可用狀態, 將value的值設定為12, 陣列數值變為[0, 1, 12, 1, 12, 12, 2, 2]二元樹如下圖所示:

這樣我們看到, 通過分配兩個4mb的byteBuf之後, 當前節點和其父節點都會設定成不可用狀態, 當index=2的節點設定為不可用之後, 將不會再找這個節點下的子節點

以此類推, 直到所有的記憶體分配完畢的時候, index為1的節點, 也會變成不可用狀態, 這樣所有的page就分配完畢, chunk中再無可用節點

我們再回到PoolArena的allocateNormal方法中

private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
    //首先在原來的chunk上進行記憶體分配(1)
    if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
        q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
        q075.allocate(buf, reqCapacity, normCapacity)) {
        ++allocationsNormal;
        return;
    }
    //建立chunk進行記憶體分配(2)
    PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
    long handle = c.allocate(normCapacity);
    ++allocationsNormal;
    assert handle > 0;
    //初始化byteBuf(3)
    c.initBuf(buf, handle, reqCapacity);
    qInit.add(c);
}

通過以上邏輯我們知道, long handle = c.allocate(normCapacity)這一步, 其實返回的就是memoryMap的一個下標, 通過這個下標, 我們能唯一的定位一塊記憶體

繼續往下跟, 通過c.initBuf(buf, handle, reqCapacity)初始化ByteBuf之後, 通過qInit.add(c)將新建立的chunk新增到chunkList中

我們跟到initBuf方法中去

void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
    int memoryMapIdx = memoryMapIdx(handle);
    int bitmapIdx = bitmapIdx(handle);
    if (bitmapIdx == 0) {
        byte val = value(memoryMapIdx);
        assert val == unusable : String.valueOf(val);
        buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx), 
                 arena.parent.threadCache());
    } else {
        initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
    }
}

這裡通過memoryMapIdx(handle)找到memoryMap的下標, 其實就是handle的值

bitmapIdx(handle)是有關subPage中使用到的邏輯, 如果是page級別的分配, 這裡只返回0, 所以進入到if塊中

if中首先斷言當前節點是不是不可用狀態, 然後通過init方法進行初始化

其中runOffset(memoryMapIdx)表示偏移量, 偏移量相當於分配給緩衝區的這塊記憶體相對於chunk中申請的記憶體的首地址偏移了多少

引數runLength(memoryMapIdx), 表示根據下標獲取可分配的最大長度

我們跟到init中, 這裡會走到PooledByteBuf的init方法中:

void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
    //初始化
    assert handle >= 0;
    assert chunk != null;
    //在哪一塊記憶體上進行分配的
    this.chunk = chunk;
    //這一塊記憶體上的哪一塊連續記憶體
    this.handle = handle;
    memory = chunk.memory;
    this.offset = offset;
    this.length = length;
    this.maxLength = maxLength;
    tmpNioBuf = null;
    this.cache = cache;
}

這裡又是我們熟悉的部分, 將屬性進行了初始化

以上就是完整的DirectUnsafePooledByteBuf在page級別的完整分配的流程, 邏輯也是非常的複雜, 想真正的掌握熟練, 也需要多下功夫進行偵錯和剖析,更多關於Netty分散式ByteBuf使用page級別記憶體分配的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com