首頁 > 軟體

Redisson如何解決Redis分散式鎖提前釋放問題

2022-05-26 18:02:57

前言:

在分散式場景下,相信你或多或少需要使用分散式鎖來存取臨界資源,或者控制耗時操作的並行性。

當然,實現分散式鎖的方案也比較多,比如資料庫、redis、zk 等等。本文主要結合一個線上案例,講解 redis 分散式鎖的相關實現。

一、問題描述:

某天線上出現了資料重複處理問題,經排查後發現,竟然是單次處理時間較長,redis 分散式鎖提前釋放導致相同請求並行處理。

其實,這是一個鎖續約的問題,對於一把分散式鎖,我們需要考慮,設定鎖多長時間過期、出現異常如何釋放鎖?

以上問題便是本文要討論的主題。

二、原因分析:

      專案採用較簡單的自定義 redis 分散式鎖,為避免死鎖定義預設過期時間 10s,如下:

    override fun lock() {

        while (true) {
            //嘗試獲取鎖
            if (tryLock()) {
                return
            }
            try {
                Thread.sleep(10)
            } catch (e: InterruptedException) {
                e.printStackTrace()
            }

        }
    }

    override fun tryLock(): Boolean {
        val value = getUniqueSign() // 隨機串
        val flag = redisTemplate!!.opsForValue().setIfAbsent(name, value, 10000, TimeUnit.MILLISECONDS)
        if (flag != null && flag) {
            VALUE_lOCAL.set(value)
            INTO_NUM_LOCAL.set(if (INTO_NUM_LOCAL.get() != null) INTO_NUM_LOCAL.get() + 1 else 1)
            return true
        }
        return false
    }

缺乏對鎖自動續期等實現。

三、解決方案:

1、思考: 

針對這種場景,可以考慮的是如何給鎖自動續期-當業務沒有執行結束的情況下,當然也可以自定義實現 比如開一個後臺執行緒定時的給這些拿到鎖的執行緒續期。

Redisson 也正是基於這種思路實現自動續期的分散式鎖,各種異常情況也考慮的更加完善,綜合考慮採用 Redisson 的分散式鎖解決方案優化。

2、Redisson簡單設定:

@Configuration
@EnableConfigurationProperties(RedissonProperties::class)
class RedissonConfig {

    @Bean
    fun redissonClient(redissonProperties: RedissonProperties): RedissonClient {
        val config = Config()
        val singleServerConfig = redissonProperties.singleServerConfig!!
        config.useSingleServer().setAddress(singleServerConfig.address)
                .setDatabase(singleServerConfig.database)
                .setUsername(singleServerConfig.username)
                .setPassword(singleServerConfig.password)
                .setConnectionPoolSize(singleServerConfig.connectionPoolSize)
              .setConnectionMinimumIdleSize(singleServerConfig.connectionMinimumIdleSize)
                .setConnectTimeout(singleServerConfig.connectTimeout)
                .setIdleConnectionTimeout(singleServerConfig.idleConnectionTimeout)
                .setRetryInterval(singleServerConfig.retryInterval)
                .setRetryAttempts(singleServerConfig.retryAttempts)
                .setTimeout(singleServerConfig.timeout)
        return Redisson.create(config)
    }

}

@ConfigurationProperties(prefix = "xxx.redisson")
class RedissonProperties {
    var singleServerConfig: SingleServerConfig? = null
}

Redis 服務使用的騰訊雲的哨兵模式架構,此架構對外開放一個代理地址存取,因此這裡設定單機模式設定即可。

如果你是自己搭建的 redis 哨兵模式架構,需要按照檔案設定相關必要引數

3、使用樣例:

    ...
  
    @Autowired
    lateinit var redissonClient: RedissonClient

 
    ... 

    fun xxx() {

      ...

      val lock = redissonClient.getLock("mylock")
      lock.lock()
      try {
        
        ... 

      } finally {
        lock.unlock()
      }

        ...

    }

使用方式和JDK提供的鎖是不是很像?是不是很簡單?

正是Redisson這類優秀的開源產品的出現,才讓我們將更多的時間投入到業務開發中...

四、原始碼分析

下面來看看 Redisson 對常規分散式鎖的實現,主要分析 RedissonLock

1、lock加鎖操作

    @Override
    public void lock() {
        try {
            lock(-1, null, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }



    // 租約期限, 也就是expire時間, -1代表未設定 將使用系統預設的30s
    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        // 嘗試拿鎖, 如果能拿到就直接返回
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }

        // 如果拿不到鎖就嘗試一直輪循, 直到成功獲取鎖或者異常終止
        try {
            while (true) {
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                ...

            }
        } finally {
            unsubscribe(future, threadId);
        }
    }

1.1、tryAcquire

    private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }

    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture<Long> ttlRemainingFuture;
        // 呼叫真正獲取鎖的操作
        if (leaseTime != -1) {
            ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                    TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            // 這裡是成功獲取了鎖, 嘗試給鎖續約
            if (ttlRemaining == null) {
                if (leaseTime != -1) {
                    internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

    // 通過lua指令碼真正執行加鎖的操作
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        // 如果key不存在, 那正好, 直接set並設定過期時間
        // 如果key存在, 就有兩種情況需要考慮
        //   - 同一執行緒獲取重入鎖,直接將field(也就是getLockName(threadId))對應的value值+1
        //   - 不同執行緒競爭鎖, 此次加鎖失敗, 並直接返回此key對應的過期時間
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

1.2、續約

通過 scheduleExpirationRenewal 給鎖續約

    protected void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            // 續約操作
            renewExpiration();
        }
    }

    private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        // 設定延遲任務task, 在時長internalLockLeaseTime/3之後執行, 定期給鎖續期
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                // 真正執行續期命令操作
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getRawName() + " expiration", e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    
                    // 這次續期之後, 繼續schedule自己, 達到持續續期的效果
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

    // 所謂續期, 就是將expire過期時間再延長
    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        // 如果key以及當前執行緒存在, 則延長expire時間, 並返回1代表成功;否則返回0代表失敗
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getRawName()),
                internalLockLeaseTime, getLockName(threadId));
    }

2、unlock解鎖操作

  public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            ...
        }
     
    }

    public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<>();
        // 執行解鎖操作
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        // 操作成功之後做的事
        future.onComplete((opStatus, e) -> {
            // 取消續約task
            cancelExpirationRenewal(threadId);
            
            ...

        });

        return result;
    }

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        // 如果key以及當前執行緒對應的記錄已經不存在, 直接返回空
        // 否在將field(也就是getLockName(threadId))對應的value減1
        //   - 如果減去1之後值還大於0, 那麼重新延長過期時間
        //   - 如果減去之後值小於等於0, 那麼直接刪除key, 並行布訂閱訊息
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

以上便是 redisson 使用者端工具對 redis 分散式鎖的加/解鎖具體實現,主要解決了以下幾個問題

    1、死鎖問題:設定過期時間

    2、可重入問題:重入+1, 釋放鎖-1,當值=0時代表完全釋放鎖

    3、續約問題:可解決鎖提前釋放問題

    4、鎖釋放:誰加鎖就由誰來釋放

總結:

本文由一個線上問題做引子,通過 redis 分散式鎖的常用實現方案,最終選定 redisson 的解決方案; 並分析 redisson 的具體實現細節

相關參考:

到此這篇關於Redisson如何解決Redis分散式鎖提前釋放問題的文章就介紹到這了,更多相關Redis分散式鎖提前釋放內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com