首頁 > 軟體

AQS(AbstractQueuedSynchronizer)抽象佇列同步器及工作原理解析

2022-03-25 13:01:40

前言

AQS 絕對是JUC的重要基石,也是面試中經常被問到的,所以我們要搞清楚這個AQS到底是什麼?騎工作原理是什麼?

AQS是什麼?

AQS,AbstractQueuedSynchronizer,即佇列同步器。它是構建鎖或者其他同步元件的基礎框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),JUC並行包的作者(Doug Lea)期望它能夠成為實現大部分同步需求的基礎。它是JUC並行包中的核心基礎元件,相比synchronized,synchronized缺少了獲取鎖與釋放鎖的可操作性,可中斷、超時獲取鎖。

是用來構建鎖或者其他同步器元件的重量級基礎框架及整個JUC體系的基石,通過內建的FIFO對列來完成資源獲取執行緒的排隊工作,並通過一個int型別變數表示持有鎖的狀態。

CLH佇列:CLH(Craig, Landin, and Hagersten)佇列是一個虛擬的雙向佇列(虛擬的雙向佇列即不存在佇列範例,僅存在結點之間的關聯關係)。AQS 是將每條請求共用資源的執行緒封裝成一個 CLH 鎖佇列的一個結點(Node)來實現鎖的分配。通過CAS完成對State值的修改。
同步對列的內部結構及繼承關係

用銀行辦理業務的案例模擬AQS如何進行執行緒管理和通知機制

1、初始化的時候,state = 0 (0 表示沒有人,1表示有人),執行緒池也有沒有人在執行,現在就是沒有顧客的時候,因此第一個執行緒去的時候都是公平鎖狀態直接到視窗辦理業務。

2、現在有一個執行緒Thread A進來那麼就直接到了業務視窗去辦理,並且通過CAS將state的值變成1

3、此時有一個執行緒Thread B 進來,通過getStatue() 方法檢視到state = 1,此時ThreadA有在佔用著,所以現在ThreadB執行緒就必須先入隊等待ThreadA結束後再呼叫,但是由於現在佇列是空的,所以ThreadB執行緒並不會馬上進入到佇列,他會先進入addWaiter() 方法到enq()這個方法中去,這個方法實質上就是一個自旋鎖,在這個方法中主要的時候先要實現一個佇列的初始化工作,先形成一個傀儡結點(哨兵結點)起到一個佔位的作用,然後才能將ThreadB執行緒掛在後面。

部分原始碼附上:

	private Node addWaiter(Node mode) {
     Node node = new Node(Thread.currentThread(), mode);
     // Try the fast path of enq; backup to full enq on failure
     Node pred = tail;// 將tail的前指標賦值過去
     if (pred != null) { //一開始的時候並沒有指向所以位null,因此條件不成立不走裡面的語句
         node.prev = pred;
         if (compareAndSetTail(pred, node)) {//CAS
             pred.next = node;
             return node;
         }
     }
     enq(node);//一開始會來走這個enq()這個方法。
     return node;
 }

因為當前的ThreadB執行緒進入時,tail的prev指標為null所以就會進enq()方法。如下:

	private Node enq(final Node node) {
     for (;;) {
         Node t = tail;
         if (t == null) { // Must initialize
             if (compareAndSetHead(new Node()))
                 tail = head;
         } else {
             node.prev = t;
             if (compareAndSetTail(t, node)) {
                 t.next = node;
                 return t;
             }
         }
     }
 }

4、ThreadC執行緒開始入隊,那麼這個就是跟ThreadB是一樣的,只不過是說沒有了初始化那步了,直接掛線上程ThreadB上即可。

5、ThreadB 會去嘗試acquireQueued()這個方法,那麼第一次的時候會將哨兵結點的waitStatus = -1; 然後繼續自旋,進入到if條件的下一個條件中被呼叫park() 阻塞掉,等待著ThreadA 執行緒的unpark()。這樣才算真正的入列等待了。

	final boolean acquireQueued(final Node node, int arg) {
     boolean failed = true;
     try {
         boolean interrupted = false;
         for (;;) {
             final Node p = node.predecessor();
             if (p == head && tryAcquire(arg)) {
                 setHead(node);
                 p.next = null; // help GC
                 failed = false;
                 return interrupted;
             }
             if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt()) //第一次的時候會阻塞在前面的條件並將哨兵的waitState置為-1,第二次的話第一個條件為真,所以會走第二個判斷條件,會將存取的執行緒給阻塞掉等待unpark();
                 interrupted = true;
         }
     } finally {
         if (failed)
             cancelAcquire(node);
     }
 }
	private final boolean parkAndCheckInterrupt() {
     LockSupport.park(this); //將存取次方法的執行緒給阻塞掉,等待unpark()方法喚醒
     return Thread.interrupted();
 }

6、當ThreadA執行緒辦好業務的時候那麼就會呼叫unlock()方法釋放鎖,unlock() 方法中又會呼叫sync.release()方法,並將當前視窗的執行緒變成null,state 置成0,通過呼叫 unparkSuccessor()方法將 傀儡結點的waitState = 0

	private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread); //釋放鎖喚醒等待佇列中的等待執行緒
    }

7、接著會呼叫unpark()方法將在前面等待的ThreadB執行緒給喚醒去視窗辦業務。將head結點指向ThreadB結點,ThreadB的prev結點為null,next結點也為null,ThreadB變成空結點,此節點就成了新的哨兵結點,原來的哨兵結點被GC回收。

8、ThreadC執行緒出列和上面的過程是一樣的。

結語

AQS的核心大致是這樣,如果說我講的你還是沒有聽懂的話,可以去B站看善矽谷陽哥的高頻面試題那集去看看。我只是將其做了一個簡化,用自己的話把這個過程複述出來,希望能對你有所幫助。

到此這篇關於AQS(AbstractQueuedSynchronizer)抽象佇列同步器的文章就介紹到這了,更多相關AQS抽象佇列同步器內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com