首頁 > 軟體

詳解Java ReentrantReadWriteLock讀寫鎖的原理與實現

2022-10-18 14:00:33

概述

ReentrantReadWriteLock讀寫鎖是使用AQS的集大成者,用了獨佔模式和共用模式。本文和大家一起理解下ReentrantReadWriteLock讀寫鎖的實現原理。在這之前建議大家閱讀下下面3篇關聯文章:

深入淺出理解Java並行AQS的獨佔鎖模式

深入淺出理解Java並行AQS的共用鎖模式

通俗易懂讀寫鎖ReentrantReadWriteLock的使用

原理概述

上圖是ReentrantReadWriteLock讀寫鎖的類結構圖:

  • 實現了ReadWriteLock介面,該介面提供了獲取讀鎖和寫鎖的API。
  • ReentrantReadWriteLock讀寫鎖內部的成員變數readLock是讀鎖,指向內部類ReadLock。
  • ReentrantReadWriteLock讀寫鎖內部的成員變數writeLock是寫鎖,指向內部類WriteLock。
  • ReentrantReadWriteLock讀寫鎖內部的成員變數sync是繼承AQS的同步器,他有兩個子類FairSync公平同步器和NoFairSync非公平同步器,讀寫鎖內部也有一個sync,他們使用的是同一個sync。

讀寫鎖用的同一個sync同步器,那麼他們共用同一個state, 這樣不會混淆嗎?

不會,ReentrantReadWriteLock讀寫鎖使用了AQS中state值得低16位元表示寫鎖得計數,用高16位元表示讀鎖得計數,這樣就可以使用同一個AQS同時管理讀鎖和寫鎖。

1.ReentrantReadWriteLock類重要成員變數

// 讀鎖
private final ReentrantReadWriteLock.ReadLock readerLock;
// 寫鎖
private final ReentrantReadWriteLock.WriteLock writerLock;
// 同步器
final Sync sync;

2.ReentrantReadWriteLock構造方法

//預設是非公平鎖,可以指定引數建立公平鎖
public ReentrantReadWriteLock(boolean fair) {
    // true 為公平鎖
    sync = fair ? new FairSync() : new NonfairSync();
    // 這兩個 lock 共用同一個 sync 範例,都是由 ReentrantReadWriteLock 的 sync 提供同步實現
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

3.Sync類重要成員變數

// 用來移位
static final int SHARED_SHIFT   = 16;
// 高16位元的1
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
// 65535,16個1,代表寫鎖的最大重入次數
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
// 低16位元掩碼:0b 1111 1111 1111 1111,用來獲取寫鎖重入的次數
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

// 獲取讀寫鎖的讀鎖分配的總次數
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
// 寫鎖(獨佔)鎖的重入次數
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

加鎖原理

圖解過程

設計一個加鎖場景,t1執行緒加寫鎖,t2執行緒加讀鎖,我們看下它們整個加鎖得流程。

1.t1 加寫鎖w.lock()成功,佔了 state 的低 16 位。

  • 這裡得state分為兩部分0_1,0表示高16位元的值,1表示低16位元的值。
  • AQS當前佔用執行緒exclusiveOwnerThread屬性指向t1執行緒。

2.t2執行緒執行加讀鎖 r.lock(),嘗試獲取鎖,發現已經被寫鎖佔據了,加鎖失敗。

3.t2執行緒被封裝成一個共用模式Node.SHARED的節點,加入到AQS的佇列中。

4.在阻塞前,t2執行緒發現自己是佇列中的老二,會嘗試再次獲取讀鎖,因為t1沒有釋放,它會失敗,然後它會把佇列的前驅節點的狀態改為-1,然後阻塞自身,也就是t2執行緒。

  • 上面中黃色三角形就是等待狀態的值,前驅節點變成-1
  • 上面中的灰色表示節點所在的執行緒阻塞了

5.後面如過有其他執行緒如t3,t4加讀鎖或者寫鎖,由於t1執行緒沒有釋放鎖,會變成下面的狀態。

上面是整個解鎖的流程,下面深入原始碼驗證這個流程。

原始碼解析

1.寫鎖加鎖原始碼

WriteLock類的lock()方法是加寫鎖的入口方法。

static final class NonfairSync extends Sync {
    // ... 省略無關程式碼
 
    // 外部類 WriteLock 方法, 方便閱讀, 放在此處
    public void lock() {
        sync.acquire(1);
    }
 
    // AQS 繼承過來的方法, 方便閱讀, 放在此處
    public final void acquire(int arg) {
        if (
            // 嘗試獲得寫鎖失敗
                !tryAcquire(arg) &&
                        // 將當前執行緒關聯到一個 Node 物件上, 模式為獨佔模式
                        // 進入 AQS 佇列阻塞
                        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
        ) {
            selfInterrupt();
        }
    }
 
    
    protected final boolean tryAcquire(int acquires) {
        // 獲取當前執行緒
        Thread current = Thread.currentThread();
        //獲得鎖的狀態
        int c = getState();
        // 獲得低 16 位, 代表寫鎖的 state 計數
        int w = exclusiveCount(c);
         // c不等於0表示加了讀鎖或者寫鎖
        if (c != 0) {
            if (
                // c != 0 and w == 0 表示有讀鎖返回錯誤,讀鎖不支援鎖升級, 或者
                    w == 0 ||
                        	// w != 0 說明有寫鎖,寫鎖的擁有者不是自己,獲取失敗
                            current != getExclusiveOwnerThread()
            ) {
                // 獲得鎖失敗
                return false;
            }
            // 寫鎖計數超過低 16 位最大數量, 報異常
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // 寫鎖重入, 獲得鎖成功,沒有並行,所以不使用 CAS
            setState(c + acquires);
            return true;
        }
        if (
             // c == 0,說明沒有任何鎖,判斷寫鎖是否該阻塞,是 false 就嘗試獲取鎖,失敗返回 false
                writerShouldBlock() ||
                        // 嘗試更改計數失敗
                        !compareAndSetState(c, c + acquires)
        ) {
            // 獲得鎖失敗
            return false;
        }
        // 獲得鎖成功,設定鎖的持有執行緒為當前執行緒
        setExclusiveOwnerThread(current);
        return true;
    }
 
    // 非公平鎖 writerShouldBlock 總是返回 false, 無需阻塞
    final boolean writerShouldBlock() {
        return false; 
    }
    // 公平鎖會檢查 AQS 佇列中是否有前驅節點, 沒有(false)才去競爭
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
}
  • tryAcquire()方法是模板方法,由子類自定義實現獲取鎖的邏輯。
  • 執行緒如果獲取寫鎖失敗的話,通過acquireQueued()方法封裝成獨佔Node加入到AQS佇列中。

2.讀鎖加鎖原始碼

ReadLock類的lock()方法是加讀鎖的入口方法,呼叫tryAcquireShared()方法嘗試獲取讀鎖,返回負數,失敗,加入到佇列中。

// 加讀鎖的方法入口
public void lock() {
    sync.acquireShared(1);
}
public final void acquireShared(int arg) {
    // tryAcquireShared 返回負數, 表示獲取讀鎖失敗,加入到佇列中
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

tryAcquireShared()方法是一個模板方法,AQS類中定義語意,子類實現,如果返回1,表示獲取鎖成功,還有剩餘資源,返回0表示獲取成功,沒有剩餘資源,返回-1表示失敗。

// 嘗試以共用模式獲取,返回1表示獲取鎖成功,還有剩餘資源,返回0表示獲取成功,沒有剩餘資源,返回-1,表示失敗
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    // exclusiveCount(c) 代表低 16 位, 寫鎖的 state,成立說明有執行緒持有寫鎖
    // 寫鎖的持有者不是當前執行緒,則獲取讀鎖失敗,【寫鎖允許降級】
    if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
        return -1;
    
    // 高 16 位,代表讀鎖的 state,共用鎖分配出去的總次數
    int r = sharedCount(c);
    // 讀鎖是否應該阻塞
    if (!readerShouldBlock() &&	r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {		// 嘗試增加讀鎖計數
        // 加鎖成功
        // 加鎖之前讀鎖為 0,說明當前執行緒是第一個讀鎖執行緒
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        // 第一個讀鎖執行緒是自己就發生了讀鎖重入
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            // cachedHoldCounter 設定為當前執行緒的 holdCounter 物件,即最後一個獲取讀鎖的執行緒
            HoldCounter rh = cachedHoldCounter;
            // 說明還沒設定 rh
            if (rh == null || rh.tid != getThreadId(current))
                // 獲取當前執行緒的鎖重入的物件,賦值給 cachedHoldCounter
                cachedHoldCounter = rh = readHolds.get();
            // 還沒重入
            else if (rh.count == 0)
                readHolds.set(rh);
            // 重入 + 1
            rh.count++;
        }
        // 讀鎖加鎖成功
        return 1;
    }
    // 邏輯到這 應該阻塞,或者 cas 加鎖失敗
    // 會不斷嘗試 for (;;) 獲取讀鎖, 執行過程中無阻塞
    return fullTryAcquireShared(current);
}

// 非公平鎖 readerShouldBlock 偏向寫鎖一些,看 AQS 阻塞佇列中第一個節點是否是寫鎖,是則阻塞,反之不阻塞
// 防止一直有讀鎖執行緒,導致寫鎖執行緒飢餓
// true 則該阻塞, false 則不阻塞
final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}

// 下面是公平鎖的readerShouldBlock
// 公平鎖會檢查 AQS 佇列中是否有前驅節點, 沒有(false)才去競爭
final boolean readerShouldBlock() {
    return hasQueuedPredecessors();
}

fullTryAcquireShared()方法是通過自旋的方式不斷獲取讀鎖,因為由於前面的readerShouldBlock返回false或者cas失敗,導致沒有獲取到鎖,需要不斷重試。

final int fullTryAcquireShared(Thread current) {
    // 當前讀鎖執行緒持有的讀鎖次數物件
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        // 說明有執行緒持有寫鎖
        if (exclusiveCount(c) != 0) {
            // 寫鎖不是自己則獲取鎖失敗
            if (getExclusiveOwnerThread() != current)
                return -1;
        } else if (readerShouldBlock()) {
            // 條件成立說明當前執行緒是 firstReader,當前鎖是讀忙碌狀態,而且當前執行緒也是讀鎖重入
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    // 最後一個讀鎖的 HoldCounter
                    rh = cachedHoldCounter;
                    // 說明當前執行緒也不是最後一個讀鎖
                    if (rh == null || rh.tid != getThreadId(current)) {
                        // 獲取當前執行緒的 HoldCounter
                        rh = readHolds.get();
                        // 條件成立說明 HoldCounter 物件是上一步程式碼新建的
                        // 當前執行緒不是鎖重入,在 readerShouldBlock() 返回 true 時需要去排隊
                        if (rh.count == 0)
                            // 防止記憶體漏失
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        // 越界判斷
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 讀鎖加鎖,條件內的邏輯與 tryAcquireShared 相同
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

doAcquireShared()是在獲取讀鎖失敗的時候加入AQS佇列的邏輯。

private void doAcquireShared(int arg) {
    // 將當前執行緒關聯到一個 Node 物件上, 模式為共用模式
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 獲取前驅節點
            final Node p = node.predecessor();
            // 如果前驅節點就頭節點就去嘗試獲取鎖
            if (p == head) {
                // 再一次嘗試獲取讀鎖
                int r = tryAcquireShared(arg);
                // r >= 0 表示獲取成功
                if (r >= 0) {
                    //【這裡會設定自己為頭節點,喚醒相連的後序的共用節點】
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 是否在獲取讀鎖失敗時阻塞      					 park 當前執行緒
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

setHeadAndPropagate()方法是在後續讀鎖被喚醒後,搶到鎖要處理的邏輯,包括修改佇列的頭結點,以及喚醒佇列中的下一個共用節點。

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
    // 設定自己為 head 節點
    setHead(node);
    // propagate 表示有共用資源(例如共用讀鎖或號誌),為 0 就沒有資源
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        // 獲取下一個節點
        Node s = node.next;
        // 如果當前是最後一個節點,或者下一個節點是【等待共用讀鎖的節點】
        if (s == null || s.isShared())
            // 喚醒後繼節點
            doReleaseShared();
    }
}

解鎖原理

圖解過程

由於上面t1執行緒加的寫鎖,所有其他的執行緒都被阻塞了,只有在t1執行緒解鎖以後,其他執行緒才能被喚醒,我們現在看下t1執行緒被喚醒了,會發生什麼?

1.t1執行緒執行解鎖w.unlock()成功,修改AQS中的state。

  • 這裡的state變為了0_0。
  • AQS當前佔用執行緒exclusiveOwnerThread屬性變為null。

2.t1執行緒喚醒佇列中等待的老二, 為什麼不是老大,因為老大是一個空節點,不會設定任何的執行緒。t2執行緒被喚醒後,搶鎖成功,修改state中高16位元為1。

  • 老二的執行緒節點變為藍色節點
  • AQS中的state變為1_0。

3.t2執行緒恢復執行,設定原來的老二節點為頭節點

4.t2執行緒要做的事情還沒結束呢,因為是共用模式,它現在釋放了,就此時也喚醒佇列中的下一個共用節點。

5.t3執行緒恢復去競爭讀鎖成功,這時state的高位+1,變成2。

6.這時候t3執行緒所在的Node設定為頭節點,同時發現對列的下一個節點不是共用節點,而是獨佔節點,就不會喚醒後面的節點了。

7.之後t2執行緒和t3執行緒進入尾聲,執行r.unlock操作,state的計數減一,直到變為0。

8.最後寫鎖執行緒t4被喚醒,去搶佔鎖成功,整個流程結束。

上面是整個解鎖的流程,下面深入原始碼驗證這個流程。

原始碼解析

1.寫鎖釋放流程

WriteLock類的unlock()方法是入口方法,呼叫tryRelease()方法釋放鎖,如果成功,呼叫unparkSuccessor()方法喚醒執行緒。

public void unlock() {
    // 釋放鎖
    sync.release(1);
}
public final boolean release(int arg) {
    // 嘗試釋放鎖
    if (tryRelease(arg)) {
        Node h = head;
        // 頭節點不為空並且不是等待狀態不是 0,喚醒後繼的非取消節點
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease()方法是AQS提供的模板方法,返回true表示成功,false失敗,由自定義同步器實現。

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    // 因為可重入的原因, 寫鎖計數為 0, 才算釋放成功
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        // 設定佔用執行緒為null
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

2.讀鎖釋放流程

ReadLock類的unlock()方法是釋放共用鎖的入口方法。

public void unlock() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

tryReleaseShared()方法是由AQS提供的模板方法,由自定義同步器實現。

protected final boolean tryReleaseShared(int unused) {
    //自選
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        // 讀鎖的計數不會影響其它獲取讀鎖執行緒, 但會影響其它獲取寫鎖執行緒,計數為 0 才是真正釋放
        if (compareAndSetState(c, nextc))
            // 返回是否已經完全釋放了 
            return nextc == 0;
    }
}

呼叫doReleaseShared()方法喚醒等待的執行緒,這個方法呼叫的地方有兩處,還記得嗎,一個這是裡的解鎖,還有一個是前面加共用鎖阻塞的地方,喚醒後獲取鎖成功,也會呼叫doReleaseShared()方法。

private void doReleaseShared() {
    // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一個節點 unpark
	// 如果 head.waitStatus == 0 ==> Node.PROPAGATE
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // SIGNAL 喚醒後繼
            if (ws == Node.SIGNAL) {
                // 因為讀鎖共用,如果其它執行緒也在釋放讀鎖,那麼需要將 waitStatus 先改為 0
            	// 防止 unparkSuccessor 被多次執行
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;  
                // 喚醒後繼節點
                unparkSuccessor(h);
            }
            // 如果已經是 0 了,改為 -3,用來解決傳播性
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                
        }
        // 條件不成立說明被喚醒的節點非常積極,直接將自己設定為了新的 head,
        // 此時喚醒它的節點(前驅)執行 h == head 不成立,所以不會跳出迴圈,會繼續喚醒新的 head 節點的後繼節點
        if (h == head)                   
            break;
    }
}

以上就是詳解Java ReentrantReadWriteLock讀寫鎖的原理與實現的詳細內容,更多關於Java ReentrantReadWriteLock讀寫鎖的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com