首頁 > 軟體

ReentrantReadWriteLock 讀寫鎖分析總結

2022-05-30 18:01:38

一、讀寫鎖簡介

現實中有這樣一種場景:對共用資源有讀和寫的操作,且寫操作沒有讀操作那麼頻繁(讀多寫少)。在沒有寫操作的時候,多個執行緒同時讀一個資源沒有任何問題,所以應該允許多個執行緒同時讀取共用資源(讀讀可以並行);但是如果一個執行緒想去寫這些共用資源,就不應該允許其他執行緒對該資源進行讀和寫操作了(讀寫,寫讀,寫寫互斥)。在讀多於寫的情況下,讀寫鎖能夠提供比排它鎖更好的並行性和吞吐量。

針對這種場景,JAVA的並行包提供了讀寫鎖 ReentrantReadWriteLock,它內部,維護了一對相關的鎖,一個用於唯讀操作,稱為讀鎖;一個用於寫入操作,稱為寫鎖,描述如下:執行緒進入讀鎖的前提條件:

  • 沒有其他執行緒的寫鎖
  • 沒有寫請求或者有寫請求,但呼叫執行緒和持有鎖的執行緒是同一個。

執行緒進入寫鎖的前提條件:

  • 沒有其他執行緒的讀鎖
  • 沒有其他執行緒的寫鎖

而讀寫鎖有以下三個重要的特性:

  • 公平選擇性:支援非公平(預設)和公平的鎖獲取方式,吞吐量還是非公平優於公平。
  • 可重入:讀鎖和寫鎖都支援執行緒重入。以讀寫執行緒為例:讀執行緒獲取讀鎖後,能夠再次獲取讀鎖。寫執行緒在獲取寫鎖之後能夠再次獲取寫鎖,同時也可以獲取讀鎖。
  • 鎖降級:遵循獲取寫鎖、再獲取讀鎖最後釋放寫鎖的次序,寫鎖能夠降級成為讀鎖。

看了上面的描述大家可能有點暈,我就舉一個之前開發訂單的例子,輔助大家理解。 我們的訂單有一個主單和子單的概念:主單編碼為 orderCode, 子單編碼為 subOrderCode 對應關係是 1:N。 我在退款的時候,需要支援子單,主單退款。 子單退款,的維度是 subOrderCode 主單退款,的維度是 orderCode 可能出現並行的情況,我們可以對 orderCode 加一把讀寫鎖

  • 如果是主單退款的情況,是不是子單退款就是互斥的
  • 如果是子單退款的情況,其實就可以並行的,但是子單是 subOrderCode維度,還需要加一個 subOrderCode 的互斥鎖。

二、讀寫鎖使用

如何同時儲存讀寫鎖,可以通過 state 的值進行儲存,高 16 位表示讀鎖,低 16 位表示寫鎖。 比如: 0000 0000 0000 0000 (1<<16) 0000 0000 0000 0000 高 16 位不為0: 有讀鎖 c >>>16 低 16 位不為0: 有寫鎖 5

ReadWriteLock 介面

我們可以看到 ReentranReadWriteLock 有兩把鎖,一把讀鎖,一把寫鎖。 

使用例子

快取操作:

public class ReentrantReadWriteLockCacheTest {
    static Map<String, Object> map = new HashMap<String, Object>();
    static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    static Lock r = rwl.readLock();
    static Lock w = rwl.writeLock();
    // 獲取一個key對應的value
    public static final Object get(String key) {
        r.lock();
        try {
            return map.get(key);
        } finally {
            r.unlock();
        }
    }

    // 設定key對應的value,並返回舊的value
    public static final Object put(String key, Object value) {
        w.lock();
        try {
            return map.put(key, value);
        } finally {
            w.unlock();
        }
    }

    // 清空所有的內容
    public static final void clear() {
        w.lock();
        try {
            map.clear();
        } finally {
            w.unlock();
        }
    }
}

上述範例中,Cache組合一個非執行緒安全的HashMap作為快取的實現,同時使用讀寫鎖的 讀鎖和寫鎖來保證Cache是執行緒安全的。在讀操作get(String key)方法中,需要獲取讀鎖,這 使得並行存取該方法時不會被阻塞。寫操作put(String key,Object value)方法和clear()方法, 在更新 HashMap時必須提前獲取寫鎖,當獲取寫鎖後,其他執行緒對於讀鎖和寫鎖的獲取均被 阻塞,而 只有寫鎖被釋放之後,其他讀寫操作才能繼續。Cache使用讀寫鎖提升讀操作的並行 性,也保證每次寫操作對所有的讀寫操作的可見性,同時簡化了程式設計方式。

三、鎖的降級

鎖降級指的是寫鎖降級成為讀鎖。如果當前執行緒擁有寫鎖,然後將其釋放,最後再獲取讀鎖,這種分段完成的過程不能稱之為鎖降級。鎖降級是指把持住(當前擁有的)寫鎖,再獲取到讀鎖,隨後釋放(先前擁有的)寫鎖的過程。鎖降級可以幫助我們拿到當前執行緒修改後的結果而不被其他執行緒所破壞,防止更新丟失。

鎖降級的使用範例

因為資料不常變化,所以多個執行緒可以並行地進行資料處理,當資料變更後,如果當前執行緒感知到資料變化,則進行資料的準備工作,同時其他處理執行緒被阻塞,直到當前執行緒完成資料的準備工作。

private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock readLock = rwl.readLock();
private final Lock writeLock = rwl.writeLock();
private volatile boolean update = false;
public void processData() {
    readLock.lock();
    if (!update) {
        // 必須先釋放讀鎖
        readLock.unlock();
        // 鎖降級從寫鎖獲取到開始
        writeLock.lock();
        try {
            if (!update) {
                // TODO 準備資料的流程(略)
                update = true;
            }
            readLock.lock();
        } finally {
            writeLock.unlock();
        }
        // 鎖降級完成,寫鎖降級為讀鎖
    }
    try {
        //TODO  使用資料的流程(略)
    } finally {
        readLock.unlock();
    }
}

注意事項:

  • 讀鎖不支援條件變數
  • 重入時不升級不支援:持有讀鎖的情況下,去獲取寫鎖,會導致永久等待
  • 重入時支援降級:持有寫鎖的情況下可以去獲取讀鎖

四、ReentranReadWriteLock 結構

方法結構設計

讀寫狀態設計

五、原始碼分析

寫鎖的加鎖

方法 tryAcquire 是寫鎖的加鎖核心邏輯

protected final boolean tryAcquire(int acquires) {
    /*
     * Walkthrough:
     * 1. If read count nonzero or write count nonzero
     *    and owner is a different thread, fail.
     * 2. If count would saturate, fail. (This can only
     *    happen if count is already nonzero.)
     * 3. Otherwise, this thread is eligible for lock if
     *    it is either a reentrant acquire or
     *    queue policy allows it. If so, update state
     *    and set owner.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    // 獲取寫鎖狀態
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        // 重入
        setState(c + acquires);
        return true;
    }
    // 獲取寫鎖
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    // 設定寫鎖 owner
    setExclusiveOwnerThread(current);
    return true;
}

寫鎖的釋放

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

讀鎖的獲取

protected final int tryAcquireShared(int unused) {
    /*
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant
     *    acquires, which is postponed to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 首次獲取讀鎖
        if (r == 0) {
            firstReader = current;
            // 第一個執行緒重入
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 重入
            firstReaderHoldCount++;
        } else {
            // 後續執行緒,通過 ThreadLocal 獲取重入次數
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}

fullTryAcquireShared方法如下:

final int fullTryAcquireShared(Thread current) {
    /*
     * This code is in part redundant with that in
     * tryAcquireShared but is simpler overall by not
     * complicating tryAcquireShared with interactions between
     * retries and lazily reading hold counts.
     */
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

讀鎖的釋放

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

到此這篇關於ReentrantReadWriteLock 讀寫鎖分析總結的文章就介紹到這了,更多相關ReentrantReadWriteLock 內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com