<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
在分散式場景下,相信你或多或少需要使用分散式鎖來存取臨界資源,或者控制耗時操作的並行性。
當然,實現分散式鎖的方案也比較多,比如資料庫、redis、zk 等等。本文主要結合一個線上案例,講解 redis 分散式鎖的相關實現。
某天線上出現了資料重複處理問題,經排查後發現,竟然是單次處理時間較長,redis 分散式鎖提前釋放
導致相同請求並行處理。
其實,這是一個鎖續約
的問題,對於一把分散式鎖,我們需要考慮,設定鎖多長時間過期、出現異常如何釋放鎖?
以上問題便是本文要討論的主題。
專案採用較簡單的自定義 redis 分散式鎖,為避免死鎖定義預設過期時間 10s,如下:
override fun lock() { while (true) { //嘗試獲取鎖 if (tryLock()) { return } try { Thread.sleep(10) } catch (e: InterruptedException) { e.printStackTrace() } } } override fun tryLock(): Boolean { val value = getUniqueSign() // 隨機串 val flag = redisTemplate!!.opsForValue().setIfAbsent(name, value, 10000, TimeUnit.MILLISECONDS) if (flag != null && flag) { VALUE_lOCAL.set(value) INTO_NUM_LOCAL.set(if (INTO_NUM_LOCAL.get() != null) INTO_NUM_LOCAL.get() + 1 else 1) return true } return false }
缺乏對鎖自動續期等實現。
針對這種場景,可以考慮的是如何給鎖自動續期-當業務沒有執行結束的情況下,當然也可以自定義實現 比如開一個後臺執行緒定時的給這些拿到鎖的執行緒續期。
Redisson 也正是基於這種思路實現自動續期的分散式鎖,各種異常情況也考慮的更加完善,綜合考慮採用 Redisson 的分散式鎖解決方案優化。
@Configuration @EnableConfigurationProperties(RedissonProperties::class) class RedissonConfig { @Bean fun redissonClient(redissonProperties: RedissonProperties): RedissonClient { val config = Config() val singleServerConfig = redissonProperties.singleServerConfig!! config.useSingleServer().setAddress(singleServerConfig.address) .setDatabase(singleServerConfig.database) .setUsername(singleServerConfig.username) .setPassword(singleServerConfig.password) .setConnectionPoolSize(singleServerConfig.connectionPoolSize) .setConnectionMinimumIdleSize(singleServerConfig.connectionMinimumIdleSize) .setConnectTimeout(singleServerConfig.connectTimeout) .setIdleConnectionTimeout(singleServerConfig.idleConnectionTimeout) .setRetryInterval(singleServerConfig.retryInterval) .setRetryAttempts(singleServerConfig.retryAttempts) .setTimeout(singleServerConfig.timeout) return Redisson.create(config) } } @ConfigurationProperties(prefix = "xxx.redisson") class RedissonProperties { var singleServerConfig: SingleServerConfig? = null }
Redis 服務使用的騰訊雲的哨兵模式架構,此架構對外開放一個代理地址存取,因此這裡設定單機模式設定即可。
如果你是自己搭建的 redis 哨兵模式架構,需要按照檔案設定相關必要引數
... @Autowired lateinit var redissonClient: RedissonClient ... fun xxx() { ... val lock = redissonClient.getLock("mylock") lock.lock() try { ... } finally { lock.unlock() } ... }
使用方式和JDK提供的鎖是不是很像?是不是很簡單?
正是Redisson這類優秀的開源產品的出現,才讓我們將更多的時間投入到業務開發中...
下面來看看 Redisson 對常規分散式鎖的實現,主要分析 RedissonLock
@Override public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } } // 租約期限, 也就是expire時間, -1代表未設定 將使用系統預設的30s private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { // 嘗試拿鎖, 如果能拿到就直接返回 long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } RFuture<RedissonLockEntry> future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } // 如果拿不到鎖就嘗試一直輪循, 直到成功獲取鎖或者異常終止 try { while (true) { ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } ... } } finally { unsubscribe(future, threadId); } }
1.1、tryAcquire
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); } private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; // 呼叫真正獲取鎖的操作 if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired // 這裡是成功獲取了鎖, 嘗試給鎖續約 if (ttlRemaining == null) { if (leaseTime != -1) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } // 通過lua指令碼真正執行加鎖的操作 <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { // 如果key不存在, 那正好, 直接set並設定過期時間 // 如果key存在, 就有兩種情況需要考慮 // - 同一執行緒獲取重入鎖,直接將field(也就是getLockName(threadId))對應的value值+1 // - 不同執行緒競爭鎖, 此次加鎖失敗, 並直接返回此key對應的過期時間 return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
1.2、續約
通過 scheduleExpirationRenewal 給鎖續約
protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); // 續約操作 renewExpiration(); } } private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } // 設定延遲任務task, 在時長internalLockLeaseTime/3之後執行, 定期給鎖續期 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } // 真正執行續期命令操作 RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } // 這次續期之後, 繼續schedule自己, 達到持續續期的效果 if (res) { // reschedule itself renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); } // 所謂續期, 就是將expire過期時間再延長 protected RFuture<Boolean> renewExpirationAsync(long threadId) { // 如果key以及當前執行緒存在, 則延長expire時間, 並返回1代表成功;否則返回0代表失敗 return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
public void unlock() { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { ... } } public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise<>(); // 執行解鎖操作 RFuture<Boolean> future = unlockInnerAsync(threadId); // 操作成功之後做的事 future.onComplete((opStatus, e) -> { // 取消續約task cancelExpirationRenewal(threadId); ... }); return result; } protected RFuture<Boolean> unlockInnerAsync(long threadId) { // 如果key以及當前執行緒對應的記錄已經不存在, 直接返回空 // 否在將field(也就是getLockName(threadId))對應的value減1 // - 如果減去1之後值還大於0, 那麼重新延長過期時間 // - 如果減去之後值小於等於0, 那麼直接刪除key, 並行布訂閱訊息 return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
以上便是 redisson 使用者端工具對 redis 分散式鎖的加/解鎖具體實現,主要解決了以下幾個問題
1、死鎖問題:設定過期時間
2、可重入問題:重入+1, 釋放鎖-1,當值=0時代表完全釋放鎖
3、續約問題:可解決鎖提前釋放問題
4、鎖釋放:誰加鎖就由誰來釋放
本文由一個線上問題做引子,通過 redis 分散式鎖的常用實現方案,最終選定 redisson 的解決方案; 並分析 redisson 的具體實現細節
到此這篇關於Redisson如何解決Redis分散式鎖提前釋放問題的文章就介紹到這了,更多相關Redis分散式鎖提前釋放內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45