首頁 > 軟體

圖解Java ReentrantLock的條件變數Condition機制

2022-10-16 14:04:15

概述

想必大家都使用過wait()和notify()這兩個方法把,這兩個方法主要用於多執行緒間的協同處理,即控制執行緒之間的等待、通知、切換及喚醒。而RenentrantLock也支援這樣條件變數的能力,而且相對於synchronized 更加強大,能夠支援多個條件變數。

最好可以先閱讀ReentrantLock系列文章:

圖解Java ReentrantLock公平鎖和非公平鎖的實現

詳解Java ReentrantLock可重入,可打斷,鎖超時的實現原理

ReentrantLock條件變數使用

ReentrantLock類API

Condition newCondition(): 建立條件變數物件

Condition類API

  • void await(): 當前執行緒從執行狀態進入等待狀態,同時釋放鎖,該方法可以被中斷
  • void awaitUninterruptibly():當前執行緒從執行狀態進入等待狀態,該方法不能夠被中斷
  • void signal(): 喚醒一個等待在 Condition 條件佇列上的執行緒
  • void signalAll(): 喚醒阻塞在條件佇列上的所有執行緒
@Test
public void testCondition() throws InterruptedException {
    ReentrantLock lock = new ReentrantLock();
    //建立新的條件變數
    Condition condition = lock.newCondition();
    Thread thread0 = new Thread(() -> {
        lock.lock();
        try {
            System.out.println("執行緒0獲取鎖");
            // sleep不會釋放鎖
            Thread.sleep(500);
            //進入休息室等待
            System.out.println("執行緒0釋放鎖,進入等待");
            condition.await();
            System.out.println("執行緒0被喚醒了");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    });
    thread0.start();
    //叫醒
    Thread thread1 = new Thread(() -> {
        lock.lock();
        try {
            System.out.println("執行緒1獲取鎖");
            //喚醒
            condition.signal();
            System.out.println("執行緒1喚醒執行緒0");
        } finally {
            lock.unlock();
            System.out.println("執行緒1釋放鎖");
        }
    });
    thread1.start();

    thread0.join();
    thread1.join();
}

執行結果:

  • condition的wait和notify必須在lock範圍內
  • 實現條件變數的等待和喚醒,他們必須是同一個condition。
  • 執行緒1執行conidtion.notify()後,並沒有釋放鎖,需要等釋放鎖後,執行緒0重新獲取鎖成功後,才能繼續向下執行。

圖解實現原理

await過程

1.執行緒0(Thread-0)一開始獲取鎖,exclusiveOwnerThread欄位是Thread-0, 如下圖中的深藍色節點

2.Thread-0呼叫await方法,Thread-0封裝成Node進入ConditionObject的佇列,因為此時只有一個節點,所有firstWaiter和lastWaiter都指向Thread-0,會釋放鎖資源,NofairSync中的state會變成0,同時exclusiveOwnerThread設定為null。如下圖所示。

3.執行緒1(Thread-1)被喚醒,重新獲取鎖,如下圖的深藍色節點所示。

4.Thread-0被park阻塞,如下圖灰色節點所示:

原始碼如下:

下面是await()方法的整體流程,其中LockSupport.park(this)進行阻塞當前執行緒,後續喚醒,也會在這個程式點恢復執行。

public final void await() throws InterruptedException {
     // 判斷當前執行緒是否是中斷狀態,是就直接給箇中斷異常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 將呼叫 await 的執行緒包裝成 Node,新增到條件佇列並返回
    Node node = addConditionWaiter();
    // 完全釋放節點持有的鎖,因為其他執行緒喚醒當前執行緒的前提是【持有鎖】
    int savedState = fullyRelease(node);
    
    // 設定打斷模式為沒有被打斷,狀態碼為 0
    int interruptMode = 0;
    
    // 如果該節點還沒有轉移至 AQS 阻塞佇列, park 阻塞,等待進入阻塞佇列
    while (!isOnSyncQueue(node)) {
        // 阻塞當前執行緒,待會
        LockSupport.park(this);
        // 如果被打斷,退出等待佇列,對應的 node 【也會被遷移到阻塞佇列】尾部,狀態設定為 0
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 邏輯到這說明當前執行緒退出等待佇列,進入【阻塞佇列】
    
    // 嘗試槍鎖,釋放了多少鎖就【重新獲取多少鎖】,獲取鎖成功判斷打斷模式
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    
    // node 在條件佇列時 如果被外部執行緒中斷喚醒,會加入到阻塞佇列,但是並未設 nextWaiter = null
    if (node.nextWaiter != null)
        // 清理條件佇列內所有已取消的 Node
        unlinkCancelledWaiters();
    // 條件成立說明掛起期間發生過中斷
    if (interruptMode != 0)
        // 應用打斷模式
        reportInterruptAfterWait(interruptMode);
}

將執行緒封裝成Node, 加入到ConditionObject佇列尾部,此時節點的等待狀態時-2。

private Node addConditionWaiter() {
    // 獲取當前條件佇列的尾節點的參照,儲存到區域性變數 t 中
    Node t = lastWaiter;
    // 當前佇列中不是空,並且節點的狀態不是 CONDITION(-2),說明當前節點發生了中斷
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 清理條件佇列內所有已取消的 Node
        unlinkCancelledWaiters();
        // 清理完成重新獲取 尾節點 的參照
        t = lastWaiter;
    }
    // 建立一個關聯當前執行緒的新 node, 設定狀態為 CONDITION(-2),新增至佇列尾部
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;		// 空佇列直接放在隊首【不用CAS因為執行執行緒是持鎖執行緒,並行安全】
    else
        t.nextWaiter = node;	// 非空佇列隊尾追加
    lastWaiter = node;			// 更新隊尾的參照
    return node;
}

清理條件佇列中的cancel型別的節點,比如中斷、超時等會導致節點轉換為Cancel

// 清理條件佇列內所有已取消(不是CONDITION)的 node,【連結串列刪除的邏輯】
private void unlinkCancelledWaiters() {
    // 從頭節點開始遍歷【FIFO】
    Node t = firstWaiter;
    // 指向正常的 CONDITION 節點
    Node trail = null;
    // 等待佇列不空
    while (t != null) {
        // 獲取當前節點的後繼節點
        Node next = t.nextWaiter;
        // 判斷 t 節點是不是 CONDITION 節點,條件佇列內不是 CONDITION 就不是正常的
        if (t.waitStatus != Node.CONDITION) { 
            // 不是正常節點,需要 t 與下一個節點斷開
            t.nextWaiter = null;
            // 條件成立說明遍歷到的節點還未碰到過正常節點
            if (trail == null)
                // 更新 firstWaiter 指標為下個節點
                firstWaiter = next;
            else
                // 讓上一個正常節點指向 當前取消節點的 下一個節點,【刪除非正常的節點】
                trail.nextWaiter = next;
            // t 是尾節點了,更新 lastWaiter 指向最後一個正常節點
            if (next == null)
                lastWaiter = trail;
        } else {
            // trail 指向的是正常節點 
            trail = t;
        }
        // 把 t.next 賦值給 t,迴圈遍歷
        t = next; 
    }
}

fullyRelease方法將r讓Thread-0釋放鎖, 這個時候Thread-1就會去競爭鎖

// 執行緒可能重入,需要將 state 全部釋放
final int fullyRelease(Node node) {
    // 完全釋放鎖是否成功,false 代表成功
    boolean failed = true;
    try {
        // 獲取當前執行緒所持有的 state 值總數
        int savedState = getState();
        // release -> tryRelease 解鎖重入鎖
        if (release(savedState)) {
            // 釋放成功
            failed = false;
            // 返回解鎖的深度
            return savedState;
        } else {
            // 解鎖失敗丟擲異常
            throw new IllegalMonitorStateException();
        }
    } finally {
        // 沒有釋放成功,將當前 node 設定為取消狀態
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

判斷節點是否在AQS阻塞對列中,不在條件對列中

final boolean isOnSyncQueue(Node node) {
    // node 的狀態是 CONDITION,signal 方法是先修改狀態再遷移,所以前驅節點為空證明還【沒有完成遷移】
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 說明當前節點已經成功入隊到阻塞佇列,且當前節點後面已經有其它 node,因為條件佇列的 next 指標為 null
    if (node.next != null)
        return true;
	// 說明【可能在阻塞佇列,但是是尾節點】
    // 從阻塞佇列的尾節點開始向前【遍歷查詢 node】,如果查詢到返回 true,查詢不到返回 false
    return findNodeFromTail(node);
}

signal過程

1.Thread-1執行signal方法喚醒條件佇列中的第一個節點,即Thread-0,條件佇列置空

2.Thread-0的節點的等待狀態變更為0, 重新加入到AQS佇列尾部。

3.後續就是Thread-1釋放鎖,其他執行緒重新搶鎖。

原始碼如下:

signal()方法是喚醒的入口方法

public final void signal() {
    // 判斷呼叫 signal 方法的執行緒是否是獨佔鎖持有執行緒
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 獲取條件佇列中第一個 Node
    Node first = firstWaiter;
    // 不為空就將第該節點【遷移到阻塞佇列】
    if (first != null)
        doSignal(first);
}

呼叫doSignal()方法喚醒節點

// 喚醒 - 【將沒取消的第一個節點轉移至 AQS 佇列尾部】
private void doSignal(Node first) {
    do {
        // 成立說明當前節點的下一個節點是 null,當前節點是尾節點了,佇列中只有當前一個節點了
        if ((firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    // 將等待佇列中的 Node 轉移至 AQS 佇列,不成功且還有節點則繼續迴圈
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}

// signalAll() 會呼叫這個函數,喚醒所有的節點
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    // 喚醒所有的節點,都放到阻塞佇列中
    } while (first != null);
}

呼叫transferForSignal()方法,先將節點的 waitStatus 改為 0,然後加入 AQS 阻塞佇列尾部,將 Thread-3 的 waitStatus 改為 -1。

// 如果節點狀態是取消, 返回 false 表示轉移失敗, 否則轉移成功
final boolean transferForSignal(Node node) {
    // CAS 修改當前節點的狀態,修改為 0,因為當前節點馬上要遷移到阻塞佇列了
    // 如果狀態已經不是 CONDITION, 說明執行緒被取消(await 釋放全部鎖失敗)或者被中斷(可打斷 cancelAcquire)
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        // 返回函數呼叫處繼續尋找下一個節點
        return false;
    
    // 【先改狀態,再進行遷移】
    // 將當前 node 入阻塞佇列,p 是當前節點在阻塞佇列的【前驅節點】
    Node p = enq(node);
    int ws = p.waitStatus;
    
    // 如果前驅節點被取消或者不能設定狀態為 Node.SIGNAL,就 unpark 取消當前節點執行緒的阻塞狀態, 
    // 讓 thread-0 執行緒競爭鎖,重新同步狀態
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

以上就是圖解Java ReentrantLock的條件變數Condition機制的詳細內容,更多關於ReentrantLock條件變數Condition機制的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com