首頁 > 軟體

Java 多執行緒並行AbstractQueuedSynchronizer詳情

2022-06-16 14:00:51

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer 簡稱 AQS ,抽象佇列同步器,用來實現依賴於先進先出(FIFO)等待佇列的阻塞鎖和相關同步器的框架。這個類旨在為大多數依賴單個原子 int 值來表示同步狀態的同步器提供基礎的能力封裝。 例如 ReentrantLock、Semaphore 和 FutureTask 等等都是基於 AQS 實現的,我們也可以繼承 AQS 實現自定義同步器。

核心思想

網路上常見的解釋是:

如果被請求的共用資源空閒,則將當前請求資源的執行緒設定為有效的工作執行緒,並且將共用資源設定為鎖定狀態。如果被請求的共用資源被佔用,那麼就需要一套執行緒阻塞等待以及被喚醒時鎖分配的機制,這個機制AQS是用CLH佇列鎖實現的,即將暫時獲取不到鎖的執行緒加入到佇列中。

個人理解,可以把 AQS 當成一把鎖,它內部通過一個佇列記錄了所有要使用鎖的請求執行緒,並且管理鎖自己當前的狀態(鎖定、空閒等狀態)。相當於 AQS 就是共用資源本身,當有執行緒請求這個資源是,AQS 將請求資源的執行緒記錄當前工作執行緒,並將自身設定為鎖定狀態。後續其他執行緒請求這個 AQS 時,將請求執行緒記錄到等待佇列中,其他執行緒此時未獲取到鎖,進入阻塞等待狀態。

為什麼需要 AQS

在深入 AQS 前,我們應該持有一個疑問是為什麼需要 AQS ?synchronized 關鍵字和 CAS 原子類都提供了豐富的同步方案了。

但在實際的需求中,對同步的需求是各式各樣的,比如,我們需要對一個鎖加上超時時間,那麼光憑 synchronized 關鍵字或是 CAS 就無法實現了,需要對其進行二次封裝。而 JDK 中提供了豐富的同步方案,比如 ReentrantLock ,而 ReentrantLock 是就是基於 AQS 實現的。

用法

這部分內容來自 JDK 的註釋

要將此類用作同步器的基礎,請在適用時重新定義以下方法,方法是使用 getState、setState 和/或 compareAndSetState 檢查和/或修改同步狀態:

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

預設情況下,這些方法中的每一個都會引發 UnsupportedOperationException。 這些方法的實現必須是內部執行緒安全的,並且通常應該是短暫的而不是阻塞的。 定義這些方法是使用此類的唯一受支援的方法。 所有其他方法都被宣告為最終方法,因為它們不能獨立變化。

您可能還會發現從 AbstractOwnableSynchronizer 繼承的方法對於跟蹤擁有獨佔同步器的執行緒很有用。 鼓勵您使用它們——這使監視和診斷工具能夠幫助使用者確定哪些執行緒持有鎖。

即使此類基於內部 FIFO 佇列,它也不會自動執行 FIFO 採集策略。

獨佔同步的核心形式為:

   Acquire:
       while (!tryAcquire(arg)) {
          enqueue thread if it is not already queued;
          possibly block current thread;
       }
  
   Release:
       if (tryRelease(arg))
          unblock the first queued thread;

(共用模式類似,但可能涉及級聯訊號。)

因為在入隊之前呼叫了獲取中的檢查,所以新獲取的執行緒可能會搶在其他被阻塞和排隊的執行緒之前。 但是,如果需要,您可以定義 tryAcquire 和/或 tryAcquireShared 以通過內部呼叫一個或多個檢查方法來禁用插入,從而提供公平的 FIFO 獲取順序。 特別是,如果 hasQueuedPredecessors(一種專門為公平同步器使用的方法)返回 true,大多數公平同步器可以定義 tryAcquire 返回 false。 其他變化是可能的。

預設插入(也稱為貪婪、放棄和避免護送)策略的吞吐量和可延伸性通常最高。 雖然這不能保證公平或無飢餓,但允許較早排隊的執行緒在較晚的排隊執行緒之前重新競爭,並且每次重新競爭都有無偏見的機會成功對抗傳入執行緒。 此外,雖然獲取不是通常意義上的“旋轉”,但它們可能會在阻塞之前執行多次呼叫 tryAcquire 並穿插其他計算。 當獨佔同步只是短暫地保持時,這提供了自旋的大部分好處,而沒有大部分責任。 如果需要,您可以通過預先呼叫獲取具有“快速路徑”檢查的方法來增加這一點,可能會預先檢查 hasContended 和/或 hasQueuedThreads 以僅在同步器可能不會被爭用時才這樣做。

此類通過將其使用範圍專門用於可以依賴 int 狀態、獲取和釋放引數以及內部 FIFO 等待佇列的同步器,部分地為同步提供了高效且可延伸的基礎。 如果這還不夠,您可以使用原子類、您自己的自定義 java.util.Queue 類和 LockSupport 阻塞支援從較低階別構建同步器。

用法範例

這是一個不可重入互斥鎖類,它使用值 0 表示未鎖定狀態,使用值 1 表示鎖定狀態。 雖然不可重入鎖並不嚴格要求記錄當前所有者執行緒,但無論如何,此類都會這樣做以使使用情況更易於監控。

它還支援條件並公開一些檢測方法:

class Mutex implements Lock, java.io.Serializable {
​
   // Our internal helper class
   private static class Sync extends AbstractQueuedSynchronizer {
     // Acquires the lock if state is zero
     public boolean tryAcquire(int acquires) {
       assert acquires == 1; // Otherwise unused
       if (compareAndSetState(0, 1)) {
         setExclusiveOwnerThread(Thread.currentThread());
         return true;
       }
       return false;
     }
​
     // Releases the lock by setting state to zero
     protected boolean tryRelease(int releases) {
       assert releases == 1; // Otherwise unused
       if (!isHeldExclusively())
         throw new IllegalMonitorStateException();
       setExclusiveOwnerThread(null);
       setState(0);
       return true;
     }​
     // Reports whether in locked state
     public boolean isLocked() {
       return getState() != 0;
     }
     public boolean isHeldExclusively() {
       // a data race, but safe due to out-of-thin-air guarantees
       return getExclusiveOwnerThread() == Thread.currentThread();
     }​
     // Provides a Condition
     public Condition newCondition() {
       return new ConditionObject();
     }
     // Deserializes properly
     private void readObject(ObjectInputStream s)
         throws IOException, ClassNotFoundException {
       s.defaultReadObject();
       setState(0); // reset to unlocked state
     }
   }
   // The sync object does all the hard work. We just forward to it.
   private final Sync sync = new Sync();
   public void lock()              { sync.acquire(1); }
   public boolean tryLock()        { return sync.tryAcquire(1); }
   public void unlock()            { sync.release(1); }
   public Condition newCondition() { return sync.newCondition(); }
   public boolean isLocked()       { return sync.isLocked(); }
   public boolean isHeldByCurrentThread() {
     return sync.isHeldExclusively();
   }
   public boolean hasQueuedThreads() {
     return sync.hasQueuedThreads();
   }
   public void lockInterruptibly() throws InterruptedException {
     sync.acquireInterruptibly(1);
   }
   public boolean tryLock(long timeout, TimeUnit unit)
       throws InterruptedException {
     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
   }
 }

這是一個類似於 CountDownLatch 的鎖存器類,只是它只需要一個訊號即可觸發。 因為鎖存器是非獨佔的,所以它使用共用的獲取和釋放方法。

 class BooleanLatch {​
   private static class Sync extends AbstractQueuedSynchronizer {
     boolean isSignalled() { return getState() != 0; }
     protected int tryAcquireShared(int ignore) {
       return isSignalled() ? 1 : -1;
     }
     protected boolean tryReleaseShared(int ignore) {
       setState(1);
       return true;
     }
   }
   private final Sync sync = new Sync();
   public boolean isSignalled() { return sync.isSignalled(); }
   public void signal()         { sync.releaseShared(1); }
   public void await() throws InterruptedException {
     sync.acquireSharedInterruptibly(1);
   }
 }

AQS 底層原理

父類別 AbstractOwnableSynchronizer

AbstractQueuedSynchronizer 繼承自 AbstractOwnableSynchronizer ,後者邏輯十分簡單:

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {​
    private static final long serialVersionUID = 3737899427754241961L;​
    protected AbstractOwnableSynchronizer() { }
    private transient Thread exclusiveOwnerThread;
    // 設定當前持有鎖的執行緒
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

AbstractOwnableSynchronizer 只是定義了設定持有鎖的執行緒的能力。

CLH 佇列

AQS 的等待佇列是 CLH (Craig , Landin , and Hagersten) 鎖定佇列的變體,CLH 鎖通常用於自旋鎖。AQS 將每個請求共用資源的執行緒封裝程一個 CLH 節點來實現的,這個節點的定義是:

    /** CLH Nodes */
    abstract static class Node {
        volatile Node prev;       // initially attached via casTail
        volatile Node next;       // visibly nonnull when signallable
        Thread waiter;            // visibly nonnull when enqueued
        volatile int status;      // written by owner, atomic bit ops by others
​
        // methods for atomic operations
        final boolean casPrev(Node c, Node v) {  // for cleanQueue
            return U.weakCompareAndSetReference(this, PREV, c, v); // 通過 CAS 確保同步設定 prev 的值
        }
        final boolean casNext(Node c, Node v) {  // for cleanQueue
            return U.weakCompareAndSetReference(this, NEXT, c, v);
        }
        final int getAndUnsetStatus(int v) {     // for signalling
            return U.getAndBitwiseAndInt(this, STATUS, ~v);
        }
        final void setPrevRelaxed(Node p) {      // for off-queue assignment
            U.putReference(this, PREV, p);
        }
        final void setStatusRelaxed(int s) {     // for off-queue assignment
            U.putInt(this, STATUS, s);
        }
        final void clearStatus() {               // for reducing unneeded signals
            U.putIntOpaque(this, STATUS, 0);
        }
        private static final long STATUS = U.objectFieldOffset(Node.class, "status");
        private static final long NEXT = U.objectFieldOffset(Node.class, "next");
        private static final long PREV = U.objectFieldOffset(Node.class, "prev");
    }

CLH 的節點的資料結構是一個雙向連結串列的節點,只不過每個操作都是經過 CAS 確保執行緒安全的。要加入 CLH 鎖佇列,您可以將其自動拼接為新的尾部;要出隊,需要設定 head 欄位,以便下一個符合條件的等待節點成為新的頭節點:

 +------+  prev +-------+  prev +------+
 |      | <---- |       | <---- |      |
 | head | next  | first | next  | tail |
 |      | ----> |       | ----> |      |
 +------+       +-------+       +------+

Node 中的 status 欄位表示當前節點代表的執行緒的狀態。

status 存在三種狀態:

    static final int WAITING   = 1;          // must be 1
    static final int CANCELLED = 0x80000000; // must be negative 
    static final int COND      = 2;          // in a condition wait
  • WAITING:表示等待狀態,值為 1。
  • CANCELLED:表示當前執行緒被取消,為 0x80000000。
  • COND:表示當前節點在等待條件,也就是在條件等待佇列中,值為 2。

在上面的 COND 中,提到了一個條件等待佇列的概念。

首先,Node 是一個靜態抽象類,它在 AQS 中存在三種實現類:

  • ExclusiveNode
  • SharedNode
  • ConditionNode

前兩者都是空實現:

    static final class ExclusiveNode extends Node { }
    static final class SharedNode extends Node { }

而最後的 ConditionNode 多了些內容:

    static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {
        ConditionNode nextWaiter; 
        // 檢查執行緒是否中斷或當前執行緒的狀態已取消等待。
        public final boolean isReleasable() {
            return status <= 1 || Thread.currentThread().isInterrupted();
        }
​
        public final boolean block() {
            while (!isReleasable()) LockSupport.park();
            return true;
        }
    }

ConditionNode 拓展了兩個方法:

  • 檢查執行緒狀態是否處於等待。
  • 阻塞當前執行緒:當前執行緒正在等待執行,通過 LockSupport.park() 阻塞當前執行緒。這裡通過 while 迴圈持續重試,嘗試阻塞執行緒。

而到這一步,所有的資訊都指向了一個相關的類 Condition 。

Condition

AQS 中的 Condition 的實現是內部類 ConditionObject :

public class ConditionObject implements Condition, java.io.Serializable 

ConditionObject 實現了 Condition 介面和序列化介面,後者說明了該型別的物件可以進行序列化。而前者 Condition 介面,定義了一些行為能力:

public interface Condition {
    void await() throws InterruptedException;​
    void awaitUninterruptibly();​
    long awaitNanos(long nanosTimeout) throws InterruptedException;​
    boolean await(long time, TimeUnit unit) throws InterruptedException;​
    boolean awaitUntil(Date deadline) throws InterruptedException;​
    void signal();
    void signalAll();
}

Condition 中定義的能力與 Java 的 Object 類中提供的同步相關方法(wait、notify 和 notifyAll) 代表的能力極為相似。前者提供了更豐富的等待方法。類比的角度來看,如果 Object 是配合 synchronized 關鍵字使用的,那麼 Condition 就是用來配合基於 AQS 實現的鎖來使用的介面。

可以將 Condition 的方法分為兩組:等待和喚醒。

用於等待的方法

// 等待,當前執行緒在接到訊號或被中斷之前一直處於等待狀態    
void await() throws InterruptedException;
// 等待,當前執行緒在接到訊號之前一直處於等待狀態,不響應中斷
void awaitUninterruptibly();
//等待,當前執行緒在接到訊號、被中斷或到達指定等待時間之前一直處於等待狀態 
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 等待,當前執行緒在接到訊號、被中斷或到達指定等待時間之前一直處於等待狀態。
// 此方法在行為上等效於: awaitNanos(unit.toNanos(time)) > 0
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 等待,當前執行緒在接到訊號、被中斷或到達指定最後期限之前一直處於等待狀態    
boolean awaitUntil(Date deadline) throws InterruptedException;

用於喚醒的方法

// 喚醒一個等待執行緒。如果所有的執行緒都在等待此條件,則選擇其中的一個喚醒。在從 await 返回之前,該執行緒必須重新獲取鎖。
void signal();
// 喚醒所有等待執行緒。如果所有的執行緒都在等待此條件,則喚醒所有執行緒。在從 await 返回之前,每個執行緒都必須重新獲取鎖。
void signalAll();

ConditionObject

分析完 Condition ,繼續來理解 ConditionObject。 ConditionObject 是 Condition 在 AQS 中的實現:

public class ConditionObject implements Condition, java.io.Serializable {
    /** condition 佇列頭節點 */
    private transient ConditionNode firstWaiter;
    /** condition 佇列尾節點 */
    private transient ConditionNode lastWaiter;
    // ---- Signalling methods ----
    // 移除一個或所有等待者並將其轉移到同步佇列。
    private void doSignal(ConditionNode first, boolean all)
    public final void signal()
    public final void signalAll()​
    // ---- Waiting methods ----
    // 將節點新增到條件列表並釋放鎖定。
    private int enableWait(ConditionNode node)
    // 如果最初放置在條件佇列中的節點現在準備好重新獲取同步佇列,則返回 true。
    private boolean canReacquire(ConditionNode node) ​
    // 從條件佇列中取消連結給定節點和其他非等待節點,除非已經取消連結。
    private void unlinkCancelledWaiters(ConditionNode node) 
    // 實現不可中斷的條件等待
    public final void awaitUninterruptibly()​
    public final void await()​
    public final long awaitNanos(long nanosTimeout)​
    public final boolean awaitUntil(Date deadline)​
    public final boolean await(long time, TimeUnit unit)​
    //  ---- support for instrumentation ----​
    // 如果此條件是由給定的同步物件建立的,則返回 true。
    final boolean isOwnedBy(AbstractQueuedSynchronizer sync)​
    // 查詢是否有執行緒在此條件下等待。
    protected final boolean hasWaiters()​
    // 返回在此條件下等待的執行緒數的估計值。
    protected final int getWaitQueueLength()
    // 返回一個集合,其中包含可能正在等待此 Condition 的那些執行緒。
    protected final Collection<Thread> getWaitingThreads()
}

ConditionObject 實現了 Condition 能力的基礎上,拓展了對 ConditionNode 相關的操作,方法通過其用途可以劃分為三組:

  • Signalling
  • Waiting
  • 其他方法

Signalling methods

        public final void signal() {
            ConditionNode first = firstWaiter;
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            if (first != null)
                doSignal(first, false);
        }
        public final void signalAll() {
            ConditionNode first = firstWaiter;
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            if (first != null)
                doSignal(first, true);
        }

喚醒方法主要邏輯是通過 doSignal(ConditionNode first, boolean all) 實現的。doSignal 方法根據引數,進行一個 while 迴圈,

兩個方法傳遞進來的都是頭節點,也就是從 ConditionNode 雙向連結串列的頭節點開始遍歷,如果第二個引數 all 設定為 false ,只執行一次遍歷中邏輯。迴圈中的邏輯是:

// 最終都呼叫了這個方法
private void doSignal(ConditionNode first, boolean all) {
    while (first != null) {
        // 取出 first 的下一個節點,設定為 next
        ConditionNode next = first.nextWaiter; 
        // 如果 first 是連結串列中唯一的一個節點,設定 lastWaiter 為 null
        if ((firstWaiter = next) == null) // 
            lastWaiter = null;
        // 讀取 first 的 status ,檢查是否是 COND
        if ((first.getAndUnsetStatus(COND) & COND) != 0) { 
            // first 處於 COND 狀態,出隊
            enqueue(first); 
            // 通過 all 來判斷是否將等待的執行緒都進行喚醒邏輯。
            if (!all)
                break;  
        }
        first = next; // 迴圈指向下一個
    }
}

關鍵方法 enqueue(ConditionNode) 是 AQS 中的方法:

    final void enqueue(Node node) {
        if (node != null) {
            for (;;) {
                // 獲取尾節點
                Node t = tail; 
                // 避免不必要的記憶體屏障
                node.setPrevRelaxed(t); 
                if (t == null)      
                    // 空佇列首先初始化一個頭節點
                    tryInitializeHead();  
                else if (casTail(t, node)) { // 更新 tail 指標為 node (這裡不是將 t = node)
                    t.next = node; // 為節點 t 的 next 指標指向 node
                    if (t.status < 0)  // t 的狀態 < 0 一般代表後續節點需要執行了
                        LockSupport.unpark(node.waiter);
                    break;
                }
            }
        }
    }

可以看出 enqueue(ConditionNode) 中本質上是通過呼叫 LockSupport.unpark(node.waiter); 來喚醒執行緒的。

Waiting methods

對外提供的等待能力的方法包括:

    // 實現不可中斷的條件等待
    public final void awaitUninterruptibly()
    public final void await()​
    public final long awaitNanos(long nanosTimeout)​
    public final boolean awaitUntil(Date deadline)
    public final boolean await(long time, TimeUnit unit)

它們內部都用到了公共的邏輯:

    // 新增節點到 condition 列表並釋放鎖
    private int enableWait(ConditionNode node)
    private boolean canReacquire(ConditionNode node) 
    private void unlinkCancelledWaiters(ConditionNode node) 

enableWait

        private int enableWait(ConditionNode node) {
            if (isHeldExclusively()) { // 如果是當前執行緒持有鎖資源
                node.waiter = Thread.currentThread();  // 將節點的繫結的執行緒設定為當前執行緒
                node.setStatusRelaxed(COND | WAITING); // 設定節點狀態
                ConditionNode last = lastWaiter;       // 獲取 尾節點
                if (last == null)
                    firstWaiter = node;                // 如果列表為空, node 就是頭節點
                else
                    last.nextWaiter = node;            // 否則,將尾節點的下一個節點設定為 node
                lastWaiter = node;                     // 更新 lastWaiter 指標
                int savedState = getState();           // 獲取當前執行緒的同步狀態
                if (release(savedState))               // 在當前持有鎖資源的執行緒嘗試釋放鎖
                    return savedState;
            }
            node.status = CANCELLED; // 當前執行緒未持有鎖資源,更新 node 的狀態為 CANCELLED
            throw new IllegalMonitorStateException(); // 並丟擲 IllegalMonitorStateException
        }

這個方法對傳入的節點插入到等待佇列的隊尾,並根據當前執行緒的狀態進行了檢查。關鍵方法的 release(int) :

    public final boolean release(int arg) {
        if (tryRelease(arg)) { // 嘗試釋放鎖資源
            signalNext(head);  // 釋放成功,喚醒下一個等待中的執行緒
            return true;
        }
        return false;
    }

喚醒給定節點的下一個節點(如果存在),通過呼叫 LockSupport.unpark(s.waiter) 喚醒節點對應的執行緒。

    private static void signalNext(Node h) {
        Node s;
        if (h != null && (s = h.next) != null && s.status != 0) {
            s.getAndUnsetStatus(WAITING);
            LockSupport.unpark(s.waiter);
        }
    }

canReacquire

檢查傳入的 node 是否在連結串列中,且不為頭節點:

// 如果最初放置在條件佇列中的節點現在準備好重新獲取同步佇列,則返回 true。
private boolean canReacquire(ConditionNode node) {
    // 檢查傳入的 node 是否在連結串列中,且不為頭節點
    return node != null && node.prev != null && isEnqueued(node);
}
// in AQS 
final boolean isEnqueued(Node node) {
    // 從 Node 雙向連結串列尾部開始遍歷,是否存在 node
    for (Node t = tail; t != null; t = t.prev)
        if (t == node)
            return true;
    return false;
}

unlinkCancelledWaiters

        private void unlinkCancelledWaiters(ConditionNode node) {
            // node 為空 / node 不是隊尾 / node 是最後一個節點
            if (node == null || node.nextWaiter != null || node == lastWaiter) {
                ConditionNode w = firstWaiter, trail = null; // w = first , trail = null
                // /從連結串列頭節點開始遍歷
                while (w != null) { 
                    ConditionNode next = w.nextWaiter;  // 取出下一個節點
                    if ((w.status & COND) == 0) {       // 當前節點的狀態包含 COND
                        w.nextWaiter = null;            // 當前節點的 next 設定為 null 
                        if (trail == null)              // 如果 trail 指標為空
                            firstWaiter = next;         // firstWaiter 指向 next
                        else
                            trail.nextWaiter = next;    // trail 指標不為空,尾指標的 next 指向當前節點的下一個節點 
                        if (next == null)
                            lastWaiter = trail; // 最後將 lastWaiter 設定為 trail (過濾後的 trail 連結串列插入到隊尾)
                    } else
                        trail = w; // 頭節點狀態不是 COND,當前節點設定為 trail 指標。
                    w = next; // 下一個迴圈
                } 
            }
        }

這個方法遍歷 ConditionNode 佇列,過濾掉狀態不包含 COND 的節點。

對外提供的等待方法

上面三個方法是內部處理邏輯。而對外暴露的是以下五個方法:

    public final void awaitUninterruptibly()​
    public final void await()​
    public final long awaitNanos(long nanosTimeout)​
    public final boolean awaitUntil(Date deadline)​
    public final boolean await(long time, TimeUnit unit)

除了awaitUninterruptibly() ,其他方法所代表的能力和 Condition 介面中定義的所代表的能力基本一致。

awaitUninterruptibly

awaitUninterruptibly() 是用於實現不可中斷的條件等待:

        public final void awaitUninterruptibly() {
            ConditionNode node = new ConditionNode(); // 建立一個新的 node
            int savedState = enableWait(node);        // 將這個新 node 插入,並返回 node 的狀態
            LockSupport.setCurrentBlocker(this);      // 設定 blocker
            boolean interrupted = false, rejected = false;  // flag:中斷和拒絕
            while (!canReacquire(node)) {             // 當前執行緒關聯的 node 不再等待佇列      
                if (Thread.interrupted())             // 嘗試中斷執行緒
                    interrupted = true;
                else if ((node.status & COND) != 0) {  // 中斷執行緒不成功的情況下,如果 node 狀態包含 COND
                    // 嘗試阻塞執行緒
                    try {
                        if (rejected)  
                            node.block(); // 實際上也是 LockSupport.park
                        else
                            ForkJoinPool.managedBlock(node); 
                    } catch (RejectedExecutionException ex) {
                        rejected = true;    // 拒絕執行
                    } catch (InterruptedException ie) {
                        interrupted = true;   // 中斷
                    }
                } else
                    Thread.onSpinWait();        // 當前執行緒無法繼續執行
            }
            // 不是佇列中的唯一節點時執行下面邏輯
            LockSupport.setCurrentBlocker(null); 
            node.clearStatus();   // 清除 node 的 status 
            acquire(node, savedState, false, false, false, 0L); // 【*】重點方法
            if (interrupted)
                Thread.currentThread().interrupt();
        }

在這個方法中,首先講解兩個方法:

  • Thread.onSpinWait() 表示呼叫者暫時無法繼續,直到其他活動發生一個或多個動作。 通過在自旋等待迴圈構造的每次迭代中呼叫此方法,呼叫執行緒向執行時指示它正忙於等待。 執行時可能會採取措施來提高呼叫自旋等待迴圈構造的效能。
  • ForkJoinPool.managedBlock(node) 則是通過 Blocker 來檢查執行緒的執行狀態,然後嘗試阻塞執行緒。

最後是最關鍵的方法 acquire ,它的詳細邏輯放到最後講解, 這個方法的作用就是,當前執行緒進入等待後,需要將關聯的執行緒開啟一個自旋,掛起後能夠持續去嘗試獲取鎖資源。

await

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            ConditionNode node = new ConditionNode();
            int savedState = enableWait(node);
            LockSupport.setCurrentBlocker(this); // for back-compatibility
            boolean interrupted = false, cancelled = false, rejected = false;
            while (!canReacquire(node)) {
                if (interrupted |= Thread.interrupted()) {
                    if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                        break;              // else interrupted after signal
                } else if ((node.status & COND) != 0) {
                    try {
                        if (rejected)
                            node.block();
                        else
                            ForkJoinPool.managedBlock(node);
                    } catch (RejectedExecutionException ex) {
                        rejected = true;
                    } catch (InterruptedException ie) {
                        interrupted = true;
                    }
                } else
                    Thread.onSpinWait();    // awoke while enqueuing
            }
            LockSupport.setCurrentBlocker(null);
            node.clearStatus();
            acquire(node, savedState, false, false, false, 0L);
            if (interrupted) {
                if (cancelled) {
                    unlinkCancelledWaiters(node);
                    throw new InterruptedException();
                }
                Thread.currentThread().interrupt();
            }
        }

await() 方法相較於 awaitUninterruptibly(),while 邏輯基本一致,最後多了一步 cancelled 狀態檢查,如果 cancelled = true ,呼叫 unlinkCancelledWaiters(node),去清理等待佇列。

awaitNanos

awaitNanos(long) 在 await() 之上多了對超時時間的計算和處理邏輯:

        public final long awaitNanos(long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            ConditionNode node = new ConditionNode();
            int savedState = enableWait(node);
            long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
            long deadline = System.nanoTime() + nanos;
            boolean cancelled = false, interrupted = false;
            while (!canReacquire(node)) {
                if ((interrupted |= Thread.interrupted()) ||
                    (nanos = deadline - System.nanoTime()) <= 0L) { // 多了一個超時條件
                    if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                        break;
                } else
                    LockSupport.parkNanos(this, nanos);
            }
            node.clearStatus();
            acquire(node, savedState, false, false, false, 0L);
            if (cancelled) {
                unlinkCancelledWaiters(node);
                if (interrupted)
                    throw new InterruptedException();
            } else if (interrupted)
                Thread.currentThread().interrupt();
            long remaining = deadline - System.nanoTime(); // avoid overflow
            return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE;
        }

awaitUntil

awaitUntil(Date) 和 awaitNanos(long) 同理,只是將超時計算改成了日期計算:

            long abstime = deadline.getTime();
            // ...
            boolean cancelled = false, interrupted = false;
            while (!canReacquire(node)) {
                if ((interrupted |= Thread.interrupted()) ||
                    System.currentTimeMillis() >= abstime) { // 時間檢查
                    if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                        break;
                } else
                    LockSupport.parkUntil(this, abstime);
            }

await(long, TimeUnit)

await(long, TimeUnit) 則是邏輯更加與 awaitNanos(long) 相似了, 只是多了一步計算 awaitNanos(long nanosTimeout) 中的引數 nanosTimeout 的操作:

long nanosTimeout = unit.toNanos(time);

acquire 方法

在 wait 方法組中,最終都會呼叫到這個邏輯:

    final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) {
        Thread current = Thread.currentThread();
        byte spins = 0, postSpins = 0;   // 在取消第一個執行緒時重試
        boolean interrupted = false, first = false;
        Node pred = null;                // 入隊時節點的前一個指標
        /*
         * 反覆執行:
         *  檢查當前節點是否是 first
         *  若是, 確保 head 穩定,否則確保有效的 prev
         *  如果節點是第一個或尚未入隊,嘗試獲取
         *  否則,如果節點尚未建立,則建立這個它
         *  否則,如果節點尚未入隊,嘗試入隊一次
         *  否則,如果通過 park 喚醒,重試,最多 postSpins 次
         *  否則,如果 WAITING 狀態未設定,設定並重試
         *  否則,park 並且清除 WAITING 狀態, 檢查取消邏輯
         */
        for (;;) {
            if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) {
                if (pred.status < 0) {
                    cleanQueue();           // predecessor cancelled
                    continue;
                } else if (pred.prev == null) {
                    Thread.onSpinWait();    // ensure serialization
                    continue;
                }
            }
            if (first || pred == null) {
                boolean acquired;
                try {
                    if (shared)
                        acquired = (tryAcquireShared(arg) >= 0);
                    else
                        acquired = tryAcquire(arg);
                } catch (Throwable ex) {
                    cancelAcquire(node, interrupted, false);
                    throw ex;
                }
                if (acquired) {
                    if (first) {
                        node.prev = null;
                        head = node;
                        pred.next = null;
                        node.waiter = null;
                        if (shared)
                            signalNextIfShared(node);
                        if (interrupted)
                            current.interrupt();
                    }
                    return 1;
                }
            }
            if (node == null) {                 // allocate; retry before enqueue
                if (shared)
                    node = new SharedNode();
                else
                    node = new ExclusiveNode();
            } else if (pred == null) {          // try to enqueue
                node.waiter = current;
                Node t = tail;
                node.setPrevRelaxed(t);         // avoid unnecessary fence
                if (t == null)
                    tryInitializeHead();
                else if (!casTail(t, node))
                    node.setPrevRelaxed(null);  // back out
                else
                    t.next = node;
            } else if (first && spins != 0) {
                --spins;                        // reduce unfairness on rewaits
                Thread.onSpinWait();
            } else if (node.status == 0) {
                node.status = WAITING;          // enable signal and recheck
            } else {
                long nanos;
                spins = postSpins = (byte)((postSpins << 1) | 1);
                if (!timed)
                    LockSupport.park(this);
                else if ((nanos = time - System.nanoTime()) > 0L)
                    LockSupport.parkNanos(this, nanos);
                else
                    break;
                node.clearStatus();
                if ((interrupted |= Thread.interrupted()) && interruptible)
                    break;
            }
        }
        return cancelAcquire(node, interrupted, interruptible);
    }

這個方法會在 Node 關聯的執行緒讓出鎖資源後,開啟一個死迴圈嘗試通過 tryAcquire 嘗試獲取鎖資源,最後如果超時或嘗試次數超出限制,會通過 LockSupport.park 阻塞自身。

到此這篇關於Java 多執行緒並行AbstractQueuedSynchronizer詳情的文章就介紹到這了,更多相關Java AbstractQueuedSynchronizer內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com