首頁 > 軟體

Redisson 加鎖解鎖的實現

2022-08-18 22:02:41

分散式鎖使用

對於 redisson 分散式鎖的使用很簡單:

1、呼叫 getLock 函數獲取鎖操作物件;
2、呼叫 tryLock 函數進行加鎖;
3、呼叫 unlock 函數進行解鎖;

注意 unlock 操作需要放到 finally 程式碼段中,保證鎖可以被釋放。

private void sumLock() {
    lock = redissonClient.getLock("sum-lock");
    
    boolean b = lock.tryLock();
    if (!b) {
        log.info("獲取不到鎖");
        return;
    }
    
    try {
        for (int j = 0; j < 20000; j++) {
            ++sum;
        }
    } finally {
        lock.unlock();
    }
    
}

getLock

getLock 範例化 RedissonLock,相當於 Lock lock = new ReentrantLock() 操作;

public RLock getLock(String name) {
    // 範例化 RedissonLock,引數為指令執行器和鎖名稱
    return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
}

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    // 命令執行器,用於執行lua指令碼
    this.commandExecutor = commandExecutor;
    // 連線管理器的ID
    this.id = commandExecutor.getConnectionManager().getId();
    // 鎖續期時間(看門狗),鎖預設續期時間是 30s。
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
    this.entryName = this.id + ":" + name;
    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}

tryLock

@Override
public boolean tryLock() {
    return get(tryLockAsync());
}

@Override
public RFuture<Boolean> tryLockAsync() {
    return tryLockAsync(Thread.currentThread().getId());
}

@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
    return tryAcquireOnceAsync(-1, -1, null, threadId);
}

這裡是一系列的呼叫,可以直接跳過,直接進入到 tryAcquireOnceAsync 函數,看看 tryAcquireOnceAsync 函數的處理邏輯。

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    // 由於我們呼叫tryLocak沒有傳遞任何引數,leaseTime預設為-1,不走判斷
    if (leaseTime != -1) {
        return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    }
    
    // 呼叫獲取鎖 枷鎖的主要邏輯在這裡
    RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                                commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        // 如果發生異常那麼直接放回了
        if (e != null) {
            return;
        }

        // 鎖續期
        if (ttlRemaining) {
            scheduleExpirationRenewal(threadId);
        }
    });
    // 返回結果
    return ttlRemainingFuture;
}
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    // 將時間轉化為毫秒
    internalLockLeaseTime = unit.toMillis(leaseTime);
    // 執行指令碼
    return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

Redisson 中儲存鎖的資料型別結構採用的的是 hash,Key 為鎖名稱,VALUE的屬性是 Redisson 使用者端ID和執行緒ID組合而成的字串,值是鎖的重入次數,採用 hash 計數實現鎖的重入性。

該函數主要執行 lua 指令碼,指令碼的邏輯為:

1、redis.call(‘exists’, KEYS[1]) == 0 用於判斷鎖是否存在,等於 0 說明不存在,表明此時沒有使用者端持有鎖,此使用者端獲取鎖成功;走步驟 2,否則走步驟 4;
2、設定鎖,並且對鎖進行 +1 操作,標識獲取鎖的次數;
3、為鎖設定過期時間,成功返回 nil;
4、redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1 判斷鎖是否本使用者端持有,等於1說明是,此時是再次獲取鎖(重入),走步驟 5,否則走 7;
5、對鎖進行 +1 操作,標識獲取鎖的次數;
6、為鎖設定過期時間,成功返回 nil;
7、如果1和4的判斷都不滿足,那麼返回鎖的的剩餘時間;

unLock

@Override
public void unlock() {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}

@Override
public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<Void>();
    // 釋放鎖的邏輯主要這裡
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.onComplete((opStatus, e) -> {
        cancelExpirationRenewal(threadId);

        if (e != null) {
            result.tryFailure(e);
            return;
        }

        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }

        result.trySuccess(null);
    });

    return result;
}

這裡是一系列的呼叫,可以直接跳過,直接進入到 unlockInnerAsync 函數,看看 unlockInnerAsync 函數的處理邏輯。

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

該函數主要執行 lua 指令碼,指令碼的邏輯為:

1、redis.call(‘hexists’, KEYS[1], ARGV[3]) == 0 用於判斷鎖是否為當前使用者端持有,等於 0 說明不是直接返回 nil,否則說明是,走步驟 2;
2、對鎖進行 -1 操作,並且獲取其計數 counter;
3、判斷 counter > 0,如果大於 0 說明該使用者端多次獲取鎖,對鎖進行續期並且返回 0,因為此時業務還沒有執行完畢,否則走步驟 4;
4、如果count 小於等於 0 則刪除鎖,傳送釋放鎖的訊息,返回 1;
5、如果以上邏輯都不滿足,那麼直接返回nil;

總結

redisson 加鎖解鎖:

1、redisson 使用 lua 指令碼保證命令執行的原子性;
2、redisson 使用 redis 的 hash 資料結構型別來儲存鎖資訊,使用 鎖名稱作為 hash 名稱,使用“使用者端ID:執行緒ID”作為鍵,使用重入次數作為值;
3、每次獲取鎖,會對值進行 +1 操作,並且設定過期時間;
4、每次釋放鎖,會對值進行 -1 操作,如果沒有減少為 0,則繼續設定鎖的超時時間,否則刪除鎖,並且傳送釋放鎖的訊息。

到此這篇關於Redisson 加鎖解鎖的實現的文章就介紹到這了,更多相關Redisson 加鎖解鎖內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com