首頁 > 軟體

Java 多執行緒並行ReentrantLock

2022-06-16 14:00:46

背景

在 Java 中實現執行緒安全的傳統方式是 synchronized 關鍵字,雖然它提供了一定的同步能力,但它在使用上是嚴格的互斥同步實現:一個執行緒只能獲取一次鎖,沒有給其他執行緒提供等待佇列等機制,以至於當一個鎖被釋放後,任意執行緒都有可能獲取到鎖,沒有執行緒等待的優先順序順序,會導致重要的執行緒在沒有爭用到鎖的情況下,長時間阻塞。為了解決 synchronized 的痛點,Java 提供了 ReentrantLock 可重入鎖來提供更豐富的能力和靈活性。

ReentrantLock

ReentrantLock 是一種可重入互斥鎖,其基本能力與使用 synchronized 關鍵字相同,但拓展了一些功能。它實現了 Lock 介面,在存取共用資源時提供了同步的方法。操作共用資源的程式碼被加鎖和解鎖方法的呼叫之間,從而確保當前執行緒在呼叫加鎖方法後,阻止其他執行緒試圖存取共用資源。

可重入特性

ReentrantLock 由上次成功鎖定的但尚未解鎖的執行緒持有;當鎖不被任何執行緒擁有時,呼叫 lock 方法的執行緒將獲取到這個 ReentrantLock,如果當前執行緒已經擁有 ReentrantLock ,lock 方法會立即返回。

ReentrantLock 允許執行緒多次進入資源鎖。當執行緒第一次進入鎖時,保持計數設定為 1。在解鎖之前,執行緒可以再次重新進入鎖定狀態,並且每次保持計數加一。對於每個解鎖請求,保持計數減一,當保持計數為 0 時,資源被解鎖。

公平鎖設定引數

ReentrantLock 的構造器接收一個可選的 fairness 引數(Boolean 型別)。當設定為 true 時,線上程爭用時,鎖優先授予等待時間最長的執行緒存取。否則,此鎖不保證任何特定的順序。但是請注意,鎖的公平性不能保證執行緒排程的公平性。

可重入鎖還提供了一個公平引數,通過該引數,鎖將遵循鎖請求的順序,即線上程解鎖資源後,鎖將轉到等待時間最長的執行緒。這種公平模式是通過將 true 傳遞給鎖的建構函式來設定的。

原始碼分析

Lock 介面

ReentrantLock 實現了 Lock 介面,所以分析原始碼先從 Lock 介面開始:

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();​
    Condition newCondition();
}

Lock 介面定義了更靈活和更廣泛的鎖定操作。synchronized 關鍵字是 JVM 底層提供了 monitor 指令的形式加鎖,這導致了獲取多個鎖時,需要按獲取順序的倒序解鎖。Lock 就是為了解決這種不夠靈活的問題而出現的。Lock 介面的實現通過允許在不同範圍內獲取和釋放鎖以及允許多個鎖按任意順序的獲取和釋放。隨著這種靈活性的增加,額外的職責也就隨之而來,synchronized 關鍵字以程式碼塊的結構加鎖,執行完成鎖會自動釋放,而 Lock 的實現則需要手動釋放鎖,大多數情況下,

應該使用下面的語句實現:

 Lock l = ...;
 l.lock();
 try {
   // access the resource protected by this lock
 } finally {
   l.unlock();
 }

當鎖定和解鎖發生在不同的作用域時,必須注意確保所有在持有鎖時執行的程式碼都受到 try-finally 或 try-catch 的保護,以確保在必要時釋放鎖。

Lock 介面中定義的方法可以劃分為三部分:

  • 加鎖操作
  • 解鎖操作
  • newCondition

加鎖操作

加鎖操作提供了四個方法:

    // 獲取鎖,如果鎖不可用,則當前執行緒將被禁用以用於執行緒排程目的並處於休眠狀態,直到獲取到鎖為止。
    void lock();​
    void lockInterruptibly() throws InterruptedException;
   boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

lock():獲取鎖,如果無法獲取到,則當前執行緒進入阻塞狀態,直到獲取到鎖為止。

lockInterruptibly():除非當前執行緒被中斷,否則去獲取鎖。如果獲取到了鎖,則立即返回。如果沒有爭用到鎖,則當前執行緒阻塞,直到發生下面兩種情況之一:

如果當前執行緒:

以上兩種情況都會丟擲 InterruptedException ,並清除當前執行緒的中斷狀態。

  • 當前執行緒獲取到了鎖
  • 其他執行緒中斷了當前執行緒
  • 在進入此方法時,設定了中斷狀態
  • 在獲取鎖的過程中被中斷

tryLock()

僅當鎖處於空閒狀態時,才獲取鎖。獲取到鎖立即返回 true,如果鎖被其他執行緒持有,則此方法立即返回 false 。

此方法的典型用法是:

 Lock lock = ...;
 if (lock.tryLock()) {
   try {
     // manipulate protected state
   } finally {
     lock.unlock();
   }
 } else {
   // perform alternative actions
 }

這種用法確保鎖在獲得時解鎖,並且在未獲得鎖時不嘗試解鎖。

tryLock(long time, TimeUnit unit)

  • 如果在給定時間內鎖處於空閒狀態,且當前執行緒沒有被中斷,則獲取鎖。
  • 如果當前執行緒成功獲取到了鎖,則此方法立即返回 true ;如果當前執行緒無法獲取到鎖,則當前執行緒會進入阻塞狀態直到發生下面三種情況之一:
  • 如果進入此方法時當前執行緒處於中斷狀態或獲取鎖的過程中已進入中斷狀態,以上兩種情況都會丟擲 InterruptedException ,並清除當前執行緒的中斷狀態。
  • 此外,如果 time 引數小於等於 0 ,該方法不會等待。
    • 鎖被當前執行緒成功獲取
    • 指定時間超時
    • 其他執行緒中斷了當前執行緒

解鎖操作:

解鎖操作只提供了 unlock() 方法。

newCondition:

返回繫結到此 Lock 的 Condition 範例。

內部類

ReentrantLock 有三個內部類,分別是 Sync、NonfairSync、FairSync 。

它們的繼承關係是:

Sync

這個類是 AQS 的直接實現,它為公平鎖實現 FairSync 和非公平鎖實現 NonfairSync 提供了共同的基礎能力。

abstract static class Sync extends AbstractQueuedSynchronizer {
    @ReservedStackAccess
    final boolean tryLock()
    abstract boolean initialTryLock();
    @ReservedStackAccess
    final void lock()
    @ReservedStackAccess
    final void lockInterruptibly()
    @ReservedStackAccess
    final boolean tryLockNanos(long nanos)
    @ReservedStackAccess
    protected final boolean tryRelease(int releases)

    protected final boolean isHeldExclusively()
    final ConditionObject newCondition()
    final Thread getOwner()
    final int getHoldCount()
    final boolean isLocked()
}

下面是一些重點的方法講解。

tryLock

這個方法執行了一個不公平的嘗試加鎖操作:

    @ReservedStackAccess
    final boolean tryLock() {
        Thread current = Thread.currentThread();    // 獲取當前執行緒
        int c = getState();                         // 從 AQS 中獲取狀態
        if (c == 0) {                               // 當前鎖的狀態為未被持有
            if (compareAndSetState(0, 1)) {         // CAS 更新狀態為加鎖狀態 1
                setExclusiveOwnerThread(current);   // 設定當前持有的執行緒
                return true;                        // 獲取鎖成功,return true
            }
        } else if (getExclusiveOwnerThread() == current) {  // 如果當前持有鎖的執行緒是當前執行緒
            if (++c < 0) // overflow                        // c 即是狀態也是計數器,可重入計數 + 1
                throw new Error("Maximum lock count exceeded");
            setState(c);                                    // 更新狀態
            return true;                                    // 重入成功,return true
        }
        return false;                                       // 嘗試獲取鎖失敗。
    }

為什麼說它是不公平的,因為這個方法沒有按照公平等待原則,讓等待時間最久的執行緒優先獲取鎖資源。

initialTryLock

這是一個抽象方法,用來在 lock 前執行初始化工作。

lock

    @ReservedStackAccess
    final void lock() {
        if (!initialTryLock())
            acquire(1);
    }

先根據 initialTryLock() 進行判斷,然後呼叫 acquire(1) ,acquire 方法在 AQS 中:

    public final void acquire(int arg) {
        if (!tryAcquire(arg))
            acquire(null, arg, false, false, false, 0L);
    }

這個方法會讓當前執行緒去嘗試獲取鎖資源,並忽略中斷。通過呼叫 tryAcquire 至少一次來實現,如果失敗,則去等待佇列排隊,可能會導致阻塞。

lockInterruptibly

    @ReservedStackAccess
    final void lockInterruptibly() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!initialTryLock())
            acquireInterruptibly(1);
    }

這個方法相當於在 lock 方法前首先進行了執行緒中斷檢查,如果沒有被中斷,也是通過 initialTryLock() 判斷是否需要執行嘗試獲取鎖的操作。與 lock 方法不同,這裡呼叫的是 (1)

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted() || (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0))
        throw new InterruptedException();
}

對執行緒中斷進行了檢查,如果執行緒被中斷則中止當前操作,至少呼叫 1 次 tryAcquire 嘗試去獲取鎖資源。否則執行緒去佇列排隊,此方法可能會導致阻塞,直到呼叫 tryAcquire 成功或執行緒被中斷。

tryLockNanos

        final boolean tryLockNanos(long nanos) throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return initialTryLock() || tryAcquireNanos(1, nanos);
        }
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
        if (!Thread.interrupted()) {
            if (tryAcquire(arg))
                return true;
            if (nanosTimeout <= 0L)
                return false;
            int stat = acquire(null, arg, false, true, true,
                               System.nanoTime() + nanosTimeout); // 多了一個超時時間
            if (stat > 0)
                return true;
            if (stat == 0)
                return false;
        }
        throw new InterruptedException();
    }

本質上呼叫 acquire ,多設定了一個 time 引數。

tryRelease

        @ReservedStackAccess
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (getExclusiveOwnerThread() != Thread.currentThread())
                throw new IllegalMonitorStateException();
            boolean free = (c == 0); // c = 0 說明成功釋放鎖資源
            if (free)
                setExclusiveOwnerThread(null);
            setState(c);
            return free;
        }

可以看出,tryRelease 方法最終更新了 State ,進一步說明了 AQS 的實現,本質上都是通過原子 int 來表示同步狀態的。

newCondition

    final ConditionObject newCondition() {
        return new ConditionObject();
    }

這裡的 newCondition 返回的是 AQS 的內部類 ConditionObject 的範例。

Sync 中的方法與其含義:

NonfairSync 非公平鎖

    static final class NonfairSync extends Sync {
        final boolean initialTryLock() {
            Thread current = Thread.currentThread();
            if (compareAndSetState(0, 1)) { // 比較並設定狀態成功,狀態0表示鎖沒有被佔用
                setExclusiveOwnerThread(current); // 設定當前執行緒為持有鎖的執行緒
                return true;
            } else if (getExclusiveOwnerThread() == current) { // 重入情況
                int c = getState() + 1;
                if (c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(c);
                return true;
            } else
                return false;
        }
​
        protected final boolean tryAcquire(int acquires) {
            if (getState() == 0 && compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    }

NonfairSync 實現了 initialTryLock() ,其中主要是為當前物件設定持有執行緒;如果是重入的情況,則 state 計數 + 1 。這個方法中的邏輯和 tryLock 方法十分相似,他們都是不公平的。每次嘗試獲取鎖,都不是按照公平等待的原則,讓等待時間最久的執行緒獲得鎖,所以這是不公平鎖。

FairSync

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
        /**
         * 僅在可重入或佇列為空時獲取。
         */
        final boolean initialTryLock() {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) { // 鎖處於可用狀態
                if (!hasQueuedThreads() && compareAndSetState(0, 1)) { // 查詢是否有執行緒正在等待獲取此鎖
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (getExclusiveOwnerThread() == current) {
                if (++c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(c);
                return true;
            }
            return false;
        }
        /**
         * 僅當執行緒是佇列頭節點或為空時獲取。
         */
        protected final boolean tryAcquire(int acquires) {
            if (getState() == 0 && !hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    }

公平鎖依賴兩個判斷條件實現:

  • hasQueuedThreads 用來查詢是否有其他執行緒正在等待獲取此鎖。
  • hasQueuedPredecessors 是用來查詢是否有其他執行緒比當前執行緒等待的時間更長。

當存在其他執行緒等待時間更久時,當前執行緒的 tryAcquire 會直接返回 false 。

建構函式

ReentrantLock 有兩個建構函式:

    public ReentrantLock() {
        sync = new NonfairSync();
    }​
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

其中一個帶有 boolean 引數的構造方法,用來根據引數 fair 實現公平鎖或非公平鎖,無參構造方法預設實現是非公平鎖。

核心屬性和方法

private final Sync sync;

從構造方法中就可以看出,ReentrantLock 的 sync 屬性,代表了鎖的策略(公平 or 非公平)。

sync 是一個 Sync 型別的物件,繼承自 AQS ,ReentrantLock 對外暴露的方法,內部實際上就是呼叫 Sync 對應的方法實現的:

public class ReentrantLock implements Lock, java.io.Serializable {
    // ...
    public void lock() {
        sync.lock();
    }
    public void lockInterruptibly() throws InterruptedException {
        sync.lockInterruptibly();
    }
    
    public boolean tryLock() {
        return sync.tryLock();
    }
    
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryLockNanos(unit.toNanos(timeout));
    }
    
    public void unlock() {
        sync.release(1);
    }
    
    public Condition newCondition() {
        return sync.newCondition();
    }
    
    public int getHoldCount() {
        return sync.getHoldCount();
    }
    
    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }
    
    public boolean isLocked() {
        return sync.isLocked();
    }
    
    public final boolean isFair() {
        return sync instanceof FairSync;
    }
    
    protected Thread getOwner() {
        return sync.getOwner();
    }
    // ... 
}

ReentrantLock 看起來就像是 Sync 的代理類,當呼叫 ReentrantLock 對外暴露的方法時,會根據 sync 物件的不同的型別呼叫不同的實現 。

比如,下圖就是一個公平鎖的呼叫過程:

ReentrantLock.lock -> 
FairSync.lock -> 
AQS.acquire -> 
FairSync.tryAcquire -> 
AQS.hasQueuedPredecessors -> AQS.setExclusiveOwnerThread

總結

ReentrantLock 實現了 Lock 介面,有三個內部類,其中 Sync 繼承自 AQS ,而後兩者繼承自 Sync ,它們都繼承了 AQS 的能力。本質上來說 ReentrantLock 的底層原理就是 AQS 。

在 Sync 的兩個子類 FairSync 和 NonfairSync 分別是公平鎖策略和非公平鎖策略的實現,它們通過實現initialTryLock()方法中不同的邏輯(公平鎖多了一個檢查是否有其他等待執行緒的條件)。然後實現了不同的 tryAcquire(int acquires) ,從而線上程嘗試獲取鎖時,執行不同的策略。

到此這篇關於Java 多執行緒並行ReentrantLock的文章就介紹到這了,更多相關Java ReentrantLock內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com