首頁 > 軟體

Netty分散式高效能工具類異執行緒下回收物件解析

2022-03-29 19:00:24

前文傳送門:Netty分散式高效能工具類同執行緒下回收物件解析

異執行緒回收物件

就是建立物件和回收物件不在同一條執行緒的情況下, 物件回收的邏輯

我們之前小節簡單介紹過, 異執行緒回收物件, 是不會放在當前執行緒的stack中的, 而是放在一個WeakOrderQueue的資料結構中, 回顧我們之前的一個圖:

8-6-1

相關的邏輯, 我們跟到原始碼中:

首先從回收物件的入口方法開始, DefualtHandle的recycle方法:

public void recycle(Object object) {
    if (object != value) {
        throw new IllegalArgumentException("object does not belong to handle");
    }
    stack.push(this);
}

這部分我們並不陌生, 跟到push方法中:

void push(DefaultHandle<?> item) {
    Thread currentThread = Thread.currentThread();
    if (thread == currentThread) {
        pushNow(item);
    } else {
        pushLater(item, currentThread);
    }
}

上一小節分析過, 同執行緒會走到pushNow, 有關具體邏輯也進行了分析

如果不是同執行緒, 則會走到pushLater方法, 傳入handle物件和當前執行緒物件

跟到pushLater方法中

private void pushLater(DefaultHandle<?> item, Thread thread) {
    Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
    WeakOrderQueue queue = delayedRecycled.get(this);
    if (queue == null) {
        if (delayedRecycled.size() >= maxDelayedQueues) {
            delayedRecycled.put(this, WeakOrderQueue.DUMMY);
            return;
        }
        if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
            return;
        }
        delayedRecycled.put(this, queue);
    } else if (queue == WeakOrderQueue.DUMMY) {
        return;
    }
    queue.add(item);
}

首先通過DELAYED_RECYCLED.get()獲取一個delayedRecycled物件

我們跟到DELAYED_RECYCLED中:

private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
        new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
    @Override
    protected Map<Stack<?>, WeakOrderQueue> initialValue() {
        return new WeakHashMap<Stack<?>, WeakOrderQueue>();
    }
};

這裡我們看到DELAYED_RECYCLED是一個FastThreadLocal物件, initialValue方法建立一個WeakHashMap物件, WeakHashMap是一個map, key為stack, value為我們剛才提到過的WeakOrderQueue

從中我們可以分析到, 每個執行緒都維護了一個WeakHashMap物件

WeakHashMap中的元素, 是一個stack和WeakOrderQueue的對映, 說明了不同的stack, 對應不同的WeakOrderQueue

這裡的對映關係可以舉個例子說明:

比如執行緒1建立了一個物件, 線上程3進行了回收, 執行緒2建立了一個物件, 同樣也線上程3進行了回收, 那麼執行緒3對應的WeakHashMap中就會有兩個元素:

執行緒1的stack和執行緒2的WeakOrderQueue, 執行緒2和stack和執行緒2的WeakOrderQueue

我們回到pushLater方法中:

繼續往下看:

WeakOrderQueue queue = delayedRecycled.get(this)

拿到了當前執行緒的WeakHashMap物件delayedRecycled之後, 然後通過delayedRecycled建立物件的執行緒的stack, 拿到WeakOrderQueue

這裡的this, 就是建立物件的那個執行緒所屬的stack, 這個stack是繫結在handle中的, 建立handle物件時候進行的繫結

假設當前執行緒是執行緒2, 建立handle的執行緒是執行緒1, 這裡通過handle的stack拿到執行緒1的WeakOrderQueue

 if (queue == null) 說明執行緒2沒有回收過執行緒1的物件, 則進入if塊的邏輯:

首先看判斷 if (delayedRecycled.size() >= maxDelayedQueues) 

 delayedRecycled.size() 表示當前執行緒回收其他建立物件的執行緒的執行緒個數, 也就是有幾個其他的執行緒在當前執行緒回收物件

maxDelayedQueues表示最多能回收的執行緒個數, 這裡如果朝超過這個值, 就表示當前執行緒不能在回收其他執行緒的物件了

通過 delayedRecycled.put(this, WeakOrderQueue.DUMMY) 標記, 建立物件的執行緒的stack, 所對應的WeakOrderQueue不可用, DUMMY我們可以理解為不可用

如果沒有超過maxDelayedQueues, 則通過if判斷中的 WeakOrderQueue.allocate(this, thread) 這種方式建立一個WeakOrderQueue

allocate傳入this, 也就是建立物件的執行緒對應的stack, 假設是執行緒1, thread就是當前執行緒, 假設是執行緒2

跟到allocate方法中

static WeakOrderQueue allocate(Stack<?> stack, Thread thread) {
    return reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)
            ? new WeakOrderQueue(stack, thread) : null;
}

reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)表示執行緒1的stack還能不能分配LINK_CAPACITY個元素, 如果可以, 則直接通過new的方式建立一個WeakOrderQueue物件

再跟到reserveSpace方法中:

private static boolean reserveSpace(AtomicInteger availableSharedCapacity, int space) {
    assert space >= 0;
    for (;;) {
        int available = availableSharedCapacity.get();
        if (available < space) {
            return false;
        }
        if (availableSharedCapacity.compareAndSet(available, available - space)) {
            return true;
        }
    }
}

引數availableSharedCapacity表示執行緒1的stack允許外部執行緒給其快取多少個物件, 之前我們分析過是16384, space預設是16

方法中通過一個cas操作, 將16384減去16, 表示stack可以給其他執行緒快取的物件數為16384-16

而這16個元素, 將由執行緒2快取

回到pushLater方法中

建立之後通過 delayedRecycled.put(this, queue) 將stack和WeakOrderQueue進行關聯

最後通過queue.add(item), 將建立的WeakOrderQueue新增一個handle

講解WeakOrderQueue之前, 我們首先了解下WeakOrderQueue的資料結構

WeakOrderQueue維護了多個link, link之間是通過連結串列進行連線, 每個link可以盛放16個handle,

我們剛才分析過, 在reserveSpace方法中將 stack.availableSharedCapacity-16 , 其實就表示了先分配16個空間放在link裡, 下次回收的時候, 如果這16空間沒有填滿, 則可以繼續往裡盛放

如果16個空間都已填滿, 則通過繼續新增link的方式繼續分配16個空間用於盛放handle

WeakOrderQueue和WeakOrderQueue之間也是通過連結串列進行關聯

可以根據下圖理解上述邏輯:

8-6-2

根據以上思路, 我們跟到WeakOrderQueue的構造方法中:

private WeakOrderQueue(Stack<?> stack, Thread thread) {
    head = tail = new Link();
    owner = new WeakReference<Thread>(thread);
    synchronized (stack) {
        next = stack.head;
        stack.head = this;
    }
    availableSharedCapacity = stack.availableSharedCapacity;
}

這裡有個head和tail, 都指向一個link物件, 這裡我們可以分析到, 其實在WeakOrderQueue中維護了一個連結串列, head分別代表頭結點和尾節點, 初始狀態下, 頭結點和尾節點都指向同一個節點

簡單看下link的類的定義

private static final class Link extends AtomicInteger {
    private final DefaultHandle&lt;?&gt;[] elements = new DefaultHandle[LINK_CAPACITY];
    private int readIndex;
    private Link next;
}

每次建立一個Link, 都會建立一個DefaultHandle型別的陣列用於盛放DefaultHandle物件, 預設大小是16個

readIndex是一個讀指標, 我們之後小節會進行分析

next節點則指向下一個link

回到WeakOrderQueue的構造方法中:

owner是對向前執行緒進行一個包裝, 代表了當前執行緒

接下來在一個同步塊中, 將當前建立的WeakOrderQueue插入到stack指向的第一個WeakOrderQueue, 也就是stack的head屬性, 指向我們建立的WeakOrderQueue, 如圖所示

8-6-3

如果執行緒2建立一個和stack關聯的WeakOrderQueue, stack的head節點就就會指向執行緒2建立WeakOrderQueue

如果之後執行緒3也建立了一個和stack關聯的WeakOrderQueue, stack的head節點就會指向新建立的執行緒3的WeakOrderQueue

然後執行緒3的WeakOrderQueue再指向執行緒2的WeakOrderQueue

也就是無論哪個執行緒建立一個和同一個stack關聯的WeakOrderQueue的時候, 都插入到stack指向的WeakOrderQueue列表的頭部

這樣就可以將stack和其他執行緒釋放物件的容器WeakOrderQueue進行繫結

回到pushLater方法中

private void pushLater(DefaultHandle<?> item, Thread thread) {
    Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
    WeakOrderQueue queue = delayedRecycled.get(this);
    if (queue == null) {
        if (delayedRecycled.size() >= maxDelayedQueues) {
            delayedRecycled.put(this, WeakOrderQueue.DUMMY);
            return;
        }
        if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
            return;
        }
        delayedRecycled.put(this, queue);
    } else if (queue == WeakOrderQueue.DUMMY) {
        return;
    }
    queue.add(item);
}

根據之前分析的WeakOrderQueue的資料結構, 我們分析最後一步, 也就是WeakOrderQueue的add方法

我們跟進WeakOrderQueue的add方法:

void add(DefaultHandle<?> handle) {
    handle.lastRecycledId = id;
    Link tail = this.tail;
    int writeIndex;
    if ((writeIndex = tail.get()) == LINK_CAPACITY) {
        if (!reserveSpace(availableSharedCapacity, LINK_CAPACITY)) {
            return;
        }
        this.tail = tail = tail.next = new Link();
        writeIndex = tail.get();
    }
    tail.elements[writeIndex] = handle;
    handle.stack = null;
    tail.lazySet(writeIndex + 1);
}

首先, 看 handle.lastRecycledId = id 

lastRecycledId表示handle上次回收的id, 而id表示WeakOrderQueue的id, weakOrderQueue每次建立的時候, 會為自增一個唯一的id

 Link tail = this.tail 表示拿到當前WeakOrderQueue的中指向最後一個link的指標, 也就是尾指標

再看 if ((writeIndex = tail.get()) == LINK_CAPACITY) 

tail.get()表示獲取當前link中已經填充元素的個數, 如果等於16, 說明元素已經填充滿

然後通過eserveSpace方法判斷當前WeakOrderQueue是否還能快取stack的物件, eserveSpace方法我們剛才已經分析過, 會根據stack的屬性availableSharedCapacity-16的方式判斷還能否快取stack的物件, 如果不能再快取stack的物件, 則返回

如果還能繼續快取, 則在建立一個link, 並將尾節點指向新建立的link, 並且原來尾節點的next的節點指向新建立的link

然後拿到當前link的writeIndex, 也就是寫指標, 如果是新建立的link中沒有元素, writeIndex為0

之後將尾部的link的elements屬性, 也就是一個DefaultHandle型別的陣列, 通過陣列下標的方式將第writeIndex個節點賦值為要回收的handle

然後將handle的stack屬性設定為null, 表示當前handle不是通過stack進行回收的

最後將tail節點的元素個數進行+1, 表示下一次將從writeIndex+1的位置往裡寫

以上就是異執行緒回收物件的邏輯

以上就是Netty分散式高效能工具類異執行緒下回收物件解析的詳細內容,更多關於Netty分散式異執行緒回收物件的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com