首頁 > 軟體

ReentrantLock從原始碼解析Java多執行緒同步學習

2023-10-16 22:00:24

前言

如今多執行緒程式設計已成為了現代軟體開發中的重要部分,而並行程式設計中的執行緒同步問題更是一道難以逾越的坎。在Java語言中,synchronized是最基本的同步機制,但它也存在著許多問題,比如可重入性不足、死鎖等等。為了解決這些問題,Java提供了更加高階的同步機制——ReentrantLock。

管程

管程(Monitor)是一種用於實現多執行緒同步的抽象資料型別,它可以用來協調不同執行緒之間的互斥和同步存取共用資源。通俗地說,管程就像一個門衛,控制著進入某個共用資源區域的執行緒數量和時間,以避免多個執行緒同時存取導致的資料競爭和混亂。

管程模型

  • Mesa管程模型:由美國電腦科學家Dijkstra提出,是最流行的管程模型之一。在Mesa管程模型中,每個管程也有一個條件變數和等待佇列,但與Hoare管程不同的是,當一個執行緒請求進入管程時,如果條件不滿足,該執行緒並不會立即被阻塞,而是繼續執行後續操作,直到該執行緒主動放棄鎖資源或者其他執行緒喚醒它。
  • Hoare管程模型:由英國電腦科學家C.A.R. Hoare提出,是最早的管程模型之一。在Hoare管程模型中,每個管程都有一個條件變數和一個等待佇列,當一個執行緒請求進入管程時,如果條件不滿足,該執行緒就會被阻塞並加入等待佇列,直到條件滿足後才被喚醒。
  • Brinch Hansen管程模型:由丹麥電腦科學家Per Brinch Hansen提出,是一種改進的管程模型。在Brinch Hansen管程模型中,每個管程也有一個條件變數和等待佇列,但與其他管程模型不同的是,它允許多個執行緒同時在管程中等待,並且不需要像Hoare管程那樣每次只喚醒一個等待執行緒。

在Java中,採用的是基於Mesa管程模型實現的管程機制。具體地,Java中的synchronized關鍵字就是基於Mesa管程模型實現的,包括Java中的AbstractQueuedSynchronizer(AQS)可以被看作是一種基於管程模型實現的同步框架。

MESA模型

主要特點

  • 互斥存取

MESA模型採用了互斥存取的機制,即同一時刻只能有一個執行緒進入管程執行程式碼。

  • 條件變數

MESA模型還引入了條件變數的概念,用於實現執行緒間的等待和喚醒操作。條件變數提供了一種機制,使得執行緒可以在等待某個條件成立時掛起,並在條件成立時被喚醒。

  • 等待佇列

MESA模型使用等待佇列來維護處於等待狀態的執行緒,這些執行緒都在等待條件變數成立。等待佇列由一個或多個條件變陣列成,每個條件變數都有自己的等待佇列。

  • 原子操作

MESA模型要求管程中的所有操作都是原子操作,即一旦進入管程,就不能被中斷,直到操作執行完畢。

AQS

在講ReentrantLock之前先說一下AQS,AQS(AbstractQueuedSynchronizer)是Java中的一個同步器,它是許多同步類(如ReentrantLock、Semaphore、CountDownLatch等)的基礎。AQS提供了一種實現同步操作的框架,其中包括獨佔模式和共用模式,以及一個等待佇列來管理執行緒的等待和喚醒。AQS也借鑑了Mesa模型的思想。

共用變數

AQS內部維護了屬性volatile int state表示資源的可用狀態
state三種存取方式:

  • getState()
  • setState()
  • compareAndSetState()

資源存取方式

Exclusive-獨佔,只有一個執行緒能執行,如ReentrantLock

Share-共用,多個執行緒可以同時執行,如Semaphore/CountDownLatch

主要方法

  • isHeldExclusively():該執行緒是否正在獨佔資源。只有用到condition才需要去實現它。
  • tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共用方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int):共用方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點返回true,否則返回false

佇列

  • 同步等待佇列: 主要用於維護獲取鎖失敗時入隊的執行緒。
  • 條件等待佇列: 呼叫await()的時候會釋放鎖,然後執行緒會加入到條件佇列,呼叫signal()喚醒的時候會把條件佇列中的執行緒節點移動到同步佇列中,等待再次獲得鎖。

node節點等待狀態

  • 值為0,初始化狀態,表示當前節點在sync佇列中,等待著獲取鎖。
  • CANCELLED,值為1,表示當前的執行緒被取消;
  • SIGNAL,值為-1,表示當前節點的後繼節點包含的執行緒需要執行,也就是unpark;
  • CONDITION,值為-2,表示當前節點在等待condition,也就是在condition佇列中;
  • PROPAGATE,值為-3,表示當前場景下後續的acquireShared能夠得以執行;

ReentrantLock原始碼分析

在ReentrantLock中有一個內部類Sync會繼承 AQS然後將同步器所有呼叫都對映到Sync對應的方法。

範例化ReentrantLock

/**
 * Creates an instance of {@code ReentrantLock}.
 * This is equivalent to using {@code ReentrantLock(false)}.
 */
public ReentrantLock() {
    sync = new NonfairSync();
}
/**
 * Creates an instance of {@code ReentrantLock} with the
 * given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

ReentrantLock還提供了一個傳佈林值的範例化方式,這個傳true用來建立一個公平鎖的,預設是建立非公平鎖。非公平鎖的話 sync是用NonfairSync來進行範例化,公平鎖sync是用FairSync來進行範例化。

加鎖

現在假設有AB兩個執行緒來競爭鎖

A執行緒加鎖成功

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        //CAS修改state狀態
        if (compareAndSetState(0, 1))
           //修改成功設定exclusiveOwnerThread
            setExclusiveOwnerThread(Thread.currentThread());
        else
           //嘗試獲取資源
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

假定A執行緒先CAS修改成功,他會設定exclusiveOwnerThread為A執行緒

B執行緒嘗試加鎖

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

我們先看tryAcquire()方法,這裡體現出了他的可重入性。

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    //獲取當前資源標識
    int c = getState();
    if (c == 0) {
        //如果資源沒被佔有CAS嘗試加鎖
        if (compareAndSetState(0, acquires)) {
            //修改成功設定exclusiveOwnerThread
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //資源被佔有要判斷佔有資源的執行緒是不是當前執行緒,加鎖成功設定的exclusiveOwnerThread值在這裡就派上了用處
    else if (current == getExclusiveOwnerThread()) {
        //這下面就是將重入次數設定到資源標識裡
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

根據上面原始碼我們可以看出B執行緒嘗試加鎖是失敗的,接下來看嘗試加鎖失敗後的方法acquireQueued(addWaiter(Node.EXCLUSIVE), arg),該方法實現分為兩個部分:

  • addWaiter(Node.EXCLUSIVE):該方法會將當前執行緒加入到等待佇列(即 Sync 中的 queue)的尾部,並返回該節點。這個節點的模式是獨佔模式(Node.EXCLUSIVE),表示當前執行緒想要獲取獨佔鎖。
  • acquireQueued(Node node, int arg):該方法是一個迴圈方法,用於等待和獲取鎖。

我們先解析addWaiter

private Node addWaiter(Node mode) {
    //構建一個當前執行緒的node節點 這裡prev 和 next 都為null
    Node node = new Node(Thread.currentThread(), mode);
    // 指向雙向連結串列的尾節點的參照
    Node pred = tail;
    //B執行緒進來目前還未構建任何佇列這裡肯定是空的
    if (pred != null) {
        //如果已經構建過佇列會把當前執行緒的node節點的上一個node節點指向tail尾節點
        node.prev = pred;
        //CAS操作把當前執行緒的node節點設定為新的tail尾節點
        if (compareAndSetTail(pred, node)) {
            //把舊的tail尾節點的下一個node節點指向當前執行緒的node節點
            pred.next = node;
            return node;
        }
    }
    //尾節點為空執行
    enq(node);
    return node;
}
private Node enq(final Node node) {
    //死迴圈當tail 尾節點不為空才會跳出
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            //用CAS構建出一個head空的node節點
            if (compareAndSetHead(new Node()))
            //將當前空的node節點交給尾節點(下一次迴圈就會走else分支)
                tail = head;
        } else {
            //把我們addWaiter中建立的node節點的prev指向了當前執行緒node節點
            node.prev = t;
            //將tail尾節點更改為當前執行緒的node節點
            if (compareAndSetTail(t, node)) {
                //將t的下一個節點指向當前執行緒建立的node節點
                t.next = node;
                return t;
            }
        }
    }
}

執行完這裡初步的一個等待佇列就構建好了

解析完addWaiter我們再來解析acquireQueued,addWaiter執行完後的結果會返回一個雙向列表的node節點

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        //中斷標誌位
        boolean interrupted = false;
        for (;;) {
            //獲取當前執行緒node節點的上一個節點
            final Node p = node.predecessor();
            //如果上一個節點就是head節點說明當前執行緒其實是處在佇列第一位然後就會再次嘗試加鎖
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //這裡是重點 這個方法來判斷當前執行緒是否應該進入等待狀態
            if (shouldParkAfterFailedAcquire(p, node) &&
                //呼叫LockSupport.park(this)阻塞了當前執行緒等待被喚醒
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //獲取上一個節點的等待狀態
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)//表示當前節點的後繼節點包含的執行緒需要執行
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {//當前執行緒被取消
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//設定等待狀態為-1
    }
    return false;
}

執行到parkAndCheckInterrupt()B執行緒就會被阻塞了,後續的邏輯我們在解鎖操作unlock之後再繼續說

釋放鎖

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
   //嘗試釋放鎖
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            //head節點不為空並且等待狀態不是0就去進行unpark操作
            unparkSuccessor(h);
        return true;
    }
    return false;
}
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    //head節點等待狀態
    int ws = node.waitStatus;
    //
    if (ws < 0)
       //將頭節點的等待狀態修改為0
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    //獲取頭節點的下一個節點(也就是B節點)
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {//節點為空或者執行緒處於取消狀態
        s = null;
        //從尾節點往上找符合條件的節點
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        //對該執行緒進行unpark喚醒(B節點)
        LockSupport.unpark(s.thread);
}

喚醒之後我們的B執行緒就能繼續往下走了,我們繼續看剛剛的acquireQueued()方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            //這裡嘗試獲取鎖由於A執行緒釋放了鎖這裡是肯定獲取成功的
            if (p == head && tryAcquire(arg)) {
                //把head設定為當前節點(也就是往前移一位,並且把上一個節點指向指為null)
                setHead(node);
                //把剛剛的上一個節點也指向為null (這裡他就沒參照了會被GC回收掉)
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                //剛剛在這裡阻塞現在被喚醒
                parkAndCheckInterrupt())
                //設定標誌中斷位為true 然後開始下一次迴圈
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

總結

  • 鎖的實現方式:ReentrantLock內部通過維護一個state變數來表示鎖的狀態,其中高16位元表示持有鎖的執行緒ID,低16位元表示重入次數。使用CAS操作來獲取鎖,如果當前鎖未被持有,則將state的高16位元設定為當前執行緒ID,並將低16位元設定為1,表示重入次數為1;如果當前鎖已經被持有,則判斷持有鎖的執行緒是否為當前執行緒,如果是,則將state的低16位元加1表示重入,如果不是,則進入等待佇列。
  • 等待佇列:ReentrantLock中的等待佇列採用了CLH佇列的實現方式,每個等待執行緒會被封裝成一個Node節點,節點中維護了前繼節點、後繼節點和等待狀態等資訊。當一個執行緒需要等待鎖時,會將自己封裝成一個Node節點插入到等待佇列的尾部,並在自旋等待時自動阻塞。
  • 公平鎖與非公平鎖:ReentrantLock可以通過建構函式中的fair引數來指定鎖的公平性,當fair為true時表示該鎖是公平鎖,即等待佇列中的執行緒會按照先進先出的順序獲取鎖;當fair為false時表示該鎖是非公平鎖,即等待佇列中的執行緒會按照隨機順序獲取鎖。
  • 鎖的釋放:ReentrantLock中的鎖釋放採用了state變數的遞減來實現,當一個執行緒釋放鎖時,會將state的低16位元減1,如果減1後低16位元變為0,則表示當前執行緒已經完全釋放了鎖,此時會將高16位元清零,表示鎖變為了可獲取狀態,等待佇列中的執行緒可以繼續競爭鎖。

以上就是ReentrantLock從原始碼解析Java多執行緒同步學習的詳細內容,更多關於Java多執行緒ReentrantLock的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com