首頁 > 軟體

一文搞懂Java並行AQS的共用鎖模式

2022-10-18 14:01:38

概述

這篇文章深入淺出理解Java並行AQS的獨佔鎖模式講解了AQS的獨佔鎖實現原理,那麼本篇文章在闡述AQS另外一個重要模式,共用鎖模式,那什麼是共用鎖呢?

共用鎖可以由多個執行緒同時獲取, 比較典型的就是讀鎖,讀操作並不會產生副作用,所以可以允許多個執行緒同時對資料進行讀操作而不會有執行緒安全問題,jdk中的很多並行工具比如ReadWriteLock和CountdownLatch就是依賴AQS的共用鎖實現的。

本文重點講解下AQS是如何實現共用鎖的。

自定義共用鎖例子

首先我們通過AQS實現一個非常最最最輕量簡單的共用鎖例子,幫助大家對共用鎖有一個整體的感知。

@Slf4j
public class ShareLock {

    /**
     * 共用鎖幫助類
     */
    private static class ShareSync extends AbstractQueuedSynchronizer {

        private int lockCount;

        /**
         * 建立共用鎖幫助類,最多有count把共用鎖,超過了則阻塞
         *
         * @param count 共用鎖數量
         */
        public ShareSync(int count) {
           this.lockCount = count;
        }

        /**
         * 嘗試獲取共用鎖
         *
         * @param arg 每次獲取鎖的數量
         * @return 返回正數,表示後續其他執行緒獲取共用鎖可能成功; 返回0,表示後續其他執行緒無法獲取共用鎖;返回負數,表示當前執行緒獲取共用鎖失敗
         */
        @Override
        protected int tryAcquireShared(int arg) {
            // 自旋
            for (;;) {
                int c = getState();
                // 如果持有鎖的數量大於指定數量,返回-1,執行緒進入阻塞
                if(c >= lockCount) {
                    return -1;
                }
                int nextc = c + 1;
                // cas設定成功,返回1,獲取到共用鎖
                if (compareAndSetState(c, nextc)) {
                    return 1;
                }
            }
        }

        /**
         * 嘗試釋放共用鎖
         *
         * @param arg 釋放鎖的數量
         * @return 如果釋放後允許喚醒後續等待結點返回true,否則返回false
         */
        @Override
        protected boolean tryReleaseShared(int arg) {
            // 自旋操作
            for (; ; ) {
                int c = getState();
                // 如果沒有鎖了
                if (c == 0) {
                    return false;
                }
                // 否則鎖量-1
                int nextc = c - 1;
                // cas修改狀態
                if (compareAndSetState(c, nextc)) {
                    return true;
                }
            }
        }
    }

    private final ShareSync sync;

    public ShareLock(int count) {
        this.sync = new ShareSync(count);
    }

    /**
     * 加共用鎖
     */
    public void lockShare() {
        sync.acquireShared(1);
    }

    /**
     * 釋放共用鎖
     */
    public void releaseShare() {
        sync.releaseShared(1);
    }
}

建立內部類共用幫助鎖ShareSync類,繼承自AbstractQueuedSynchronizer類,實現了共用鎖相關的方法tryAcquireShared()tryReleaseShared()

建立ShareLock,提供了lockShare()加鎖和releaseShare()兩個API。

驗證:

public static void main(String[] args) throws InterruptedException {
        ShareLock shareLock = new ShareLock(3);
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                shareLock.lockShare();
                try {
                    log.info("lock success");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    shareLock.releaseShare();
                    log.info("release success");
                }

            }, "thread-" + i).start();
        }
        Thread.sleep(10000);
    }
  • 一共建立最多共同有3個執行緒共用的共用鎖。
  • 建立5個執行緒去競爭共用鎖。

執行結果:

  • 執行結果顯示每次最多隻有3個lock success,說明同時只有3個執行緒共用。
  • 只有在釋放共用鎖以後,其他執行緒才能獲取鎖。

下面對它的實現原理一探究竟。

核心原理機制

共用模式也是由AQS提供的,首先我們關注下AQS的資料結構。

AQS內部維護了一個volatile int state(代表共用資源)和一個FIFO執行緒等待佇列(多執行緒爭用資源被阻塞時會進入此佇列)。

AQS作為一個抽象方法,提供了加鎖、和釋放鎖的框架,這裡採用的模板方模式,在上面中提到的tryAcquireSharedtryReleaseShared就是和共用模式相關的模板方法。

方法名描述
protected int tryAcquireShared(int arg)共用方式。arg為獲取鎖的次數,嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
protected boolean tryReleaseShared(int arg)共用方式。arg為釋放鎖的次數,嘗試釋放資源,如果釋放後允許喚醒後續等待結點返回True,否則返回False。

共用模式的入口方法如下:

方法名描述
void acquireShared(int arg)共用模式獲取鎖,不響應中斷。
void acquireSharedInterruptibly(int arg)共用模式獲取鎖,響應中斷。
tryAcquireSharedNanos(int arg, long nanosTimeout)嘗試在共用模式下獲取鎖,如果中斷則中止,如果超過給定超時則失敗。
boolean releaseShared(int arg)共用模式下釋放鎖。

原始碼解析

上圖是AQS的類結構圖,其中標紅部分是組成AQS的重要成員變數。

成員變數

1.state共用變數

AQS中裡一個很重要的欄位state,表示同步狀態,是由volatile修飾的,用於展示當前臨界資源的獲鎖情況。通過getState(),setState(),compareAndSetState()三個方法進行維護。

關於state的幾個要點:

  • 使用volatile修飾,保證多執行緒間的可見性。
  • getState()、setState()、compareAndSetState()使用final修飾,限制子類不能對其重寫。
  • compareAndSetState()採用樂觀鎖思想的CAS演演算法,保證原子性操作。

2.CLH佇列(FIFO佇列)

AQS裡另一個重要的概念就是CLH佇列,它是一個雙向連結串列佇列,其內部由head和tail分別記錄頭結點和尾結點,佇列的元素型別是Node。

private transient volatile Node head;
private transient volatile Node tail;

Node的結構如下:

static final class Node {
    //共用模式下的等待標記
    static final Node SHARED = new Node();
    //獨佔模式下的等待標記
    static final Node EXCLUSIVE = null;
    //表示當前結點已取消排程。當timeout或被中斷(響應中斷的情況下),會觸發變更為此狀態,進入該狀態後的結點將不會再變化。
    static final int CANCELLED =  1;
    //表示後繼結點在等待當前結點喚醒。後繼結點入隊時,會將前繼結點的狀態更新為SIGNAL。
    static final int SIGNAL    = -1;
    //表示結點等待在Condition上,當其他執行緒呼叫了Condition的signal()方法後,CONDITION狀態的結點將從等待佇列轉移到同步佇列中,等待獲取同步鎖。
    static final int CONDITION = -2;
    //共用模式下,前繼結點不僅會喚醒其後繼結點,同時也可能會喚醒後繼的後繼結點。
    static final int PROPAGATE = -3;
    //狀態,包括上面的四種狀態值,初始值為0,一般是節點的初始狀態
    volatile int waitStatus;
    //上一個節點的參照
    volatile Node prev;
    //下一個節點的參照
    volatile Node next;
    //儲存在當前節點的執行緒參照
    volatile Thread thread;
    //condition佇列的後續節點
    Node nextWaiter;
}

注意,waitSstatus負值表示結點處於有效等待狀態,而正值表示結點已被取消。所以原始碼中很多地方用>0、<0來判斷結點的狀態是否正常。

3.exclusiveOwnerThread

AQS通過繼承AbstractOwnableSynchronizer類,擁有的屬性。表示獨佔模式下同步器持有的執行緒。

共用鎖獲取acquireShared(int)

acquireShared(int)是共用鎖模式下執行緒獲取共用資源的入口方法,它會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待佇列,直到獲取到資源為止,整個過程無法響應中斷。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

方法的整體流程如下:

  • tryAcquireShared()嘗試獲取資源,需要自定義同步器去實現,返回負值代表獲取失敗;0代表獲取成功,但沒有剩餘資源;正數表示獲取成功,還有剩餘資源,其他執行緒還可以去獲取。
  • 如果失敗則通過doAcquireShared()進入等待佇列,直到獲取到資源為止才返回。

doAcquireShared(int)

此方法用於將當前執行緒加入等待佇列尾部休息,直到其他執行緒釋放資源喚醒自己,自己成功拿到相應量的資源後才返回。

private void doAcquireShared(int arg) {
    //封裝執行緒為共用Node 加入佇列尾部
    final Node node = addWaiter(Node.SHARED);
    //是否成功標誌
    boolean failed = true;
    try {
        //等待過程中是否被中斷過的標誌
        boolean interrupted = false;
        // 自旋操作
        for (;;) {
            // 獲取前驅節點
            final Node p = node.predecessor();
            //如果到head的下一個,因為head是拿到資源的執行緒,此時node被喚醒,很可能是head用完資源來喚醒自己的
            if (p == head) {
                //嘗試獲取資源
                int r = tryAcquireShared(arg);
                //成功
                if (r >= 0) {
                    //將head指向自己,還有剩餘資源可以再喚醒之後的執行緒
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    //如果等待過程中被打斷過,此時將中斷補上。
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }

            //判斷狀態,尋找安全點,進入waiting狀態,等著被unpark()或interrupt()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

doAcquireShared方法的實現和獲取獨佔鎖中的acquireQueued方法很類似,但是主要有一點不同,那就是執行緒在被喚醒後,若成功獲取到了共用鎖,還需要判斷共用鎖是否還能被其他執行緒獲取,若可以,則繼續向後喚醒它的下一個節點對應的執行緒。

setHeadAndPropagate(Node, int)

該方法主要將當前節點設定為頭節點,同時判斷條件是否符合(比如還有剩餘資源),還會去喚醒後繼結點,畢竟是共用模式。

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    //head指向自己
    setHead(node);
     //如果還有剩餘量,繼續喚醒下一個鄰居執行緒
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            // 喚醒操作
            doReleaseShared();
    }
}

共用釋放releaseShared(int)

releaseShared(int)是共用模式下執行緒釋放共用資源的入口,它會釋放指定量的資源,如果成功釋放且允許喚醒等待執行緒,它會喚醒等待佇列裡的其他執行緒來獲取資源。

public final boolean releaseShared(int arg) {
    //嘗試釋放資源
    if (tryReleaseShared(arg)) {
        //喚醒後繼結點
        doReleaseShared();
        return true;
    }
    return false;
}

方法的整體流程如下:

  • tryReleaseShared嘗試釋放鎖,這由自定義同步器去實現, 返回true表示釋放成功。
  • doReleaseShared喚醒後續佇列中等待的節點,

doReleaseShared()

此方法主要用於喚醒佇列中等待的共用節點。

private void doReleaseShared() {
    // 自旋操作
    for (;;) {
        // 獲取頭節點
        Node h = head;
        if (h != null && h != tail) {
            // 獲取節點的等待狀態
            int ws = h.waitStatus;
            // 如果節點等待狀態是-1, -1表示有責任喚醒後續節點的狀態
            if (ws == Node.SIGNAL) {
                // cas修改當前節點的等待狀態為0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                //喚醒後續節點
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)// head發生變化
            break;
    }
}

邏輯是一個死迴圈,每次迴圈中重新讀取一次head,然後儲存在區域性變數h中,再配合if(h == head) break;,這樣,迴圈檢測到head沒有變化時就會退出迴圈。注意,head變化一定是因為:acquire thread被喚醒,之後它成功獲取鎖,然後setHead設定了新head。而且注意,只有通過if(h == head) break;即head不變才能退出迴圈,不然會執行多次迴圈。

if (h != null && h != tail)判斷佇列是否至少有兩個node,如果佇列從來沒有初始化過(head為null),或者head就是tail,那麼中間邏輯直接不走,直接判斷head是否變化了。

如果佇列中有兩個或以上個node,那麼檢查區域性變數h的狀態:

  • 如果狀態為SIGNAL,說明h的後繼是需要被通知的。通過對CAS操作結果取反,將compareAndSetWaitStatus(h, Node.SIGNAL, 0)unparkSuccessor(h)繫結在了一起。說明了只要head成功得從SIGNAL修改為0,那麼head的後繼的代表執行緒肯定會被喚醒了。
  • 如果狀態為0,說明h的後繼所代表的執行緒已經被喚醒或即將被喚醒,並且這個中間狀態即將消失,要麼由於acquire thread獲取鎖失敗再次設定head為SIGNAL並再次阻塞,要麼由於acquire thread獲取鎖成功而將自己(head後繼)設定為新head並且只要head後繼不是隊尾,那麼新head肯定為SIGNAL。所以設定這種中間狀態的head的status為PROPAGATE,讓其status又變成負數,這樣可能被被喚醒執行緒檢測到。

如果狀態為PROPAGATE,直接判斷head是否變化。

兩個continue保證了進入那兩個分支後,只有當CAS操作成功後,才可能去執行if(h == head) break;,才可能退出迴圈。

if(h == head) break;保證了,只要在某個迴圈的過程中有執行緒剛獲取了鎖且設定了新head,就會再次迴圈。目的當然是為了再次執行unparkSuccessor(h),即喚醒佇列中第一個等待的執行緒。

以上就是一文搞懂Java並行AQS的共用鎖模式的詳細內容,更多關於Java AQS共用鎖模式的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com