首頁 > 軟體

使用Redis實現分散式鎖的方法

2022-06-16 14:02:18

Redis 中的分散式鎖如何使用

分散式鎖的使用場景

為了保證我們線上服務的並行性和安全性,目前我們的服務一般拋棄了單體應用,採用的都是擴充套件性很強的分散式架構。

對於可變共用資源的存取,同一時刻,只能由一個執行緒或者程序去存取操作。這時候我們就需要做個標識,如果當前有執行緒或者程序在操作共用變數,我們就做個標記,標識當前資源正在被操作中, 其它的執行緒或者程序,就不能進行操作了。當前操作完成之後,刪除標記,這樣其他的執行緒或者程序,就能來申請共用變數的操作。通過上面的標記來保證同一時刻共用變數只能由一個執行緒或者進行持有。

對於單體應用:多個執行緒之間存取可變共用變數,比較容易處理,可簡單使用記憶體來儲存標示即可;

分散式應用:這種場景下比較麻煩,因為多個應用,部署的地址可能在不同的機房,一個在北京一個在上海。不能簡單的儲存標示在記憶體中了,這時候需要使用公共記憶體來記錄該標示,慄如 Redis,MySQL 。。。

使用 Redis 來實現分散式鎖

這裡來聊聊如何使用 Redis 實現分散式鎖

Redis 中分散式鎖一般會用 set key value px milliseconds nx 或者 SETNX+Lua來實現。

因為 SETNX 命令,需要配合 EXPIRE 設定過期時間,Redis 中單命令的執行是原子性的,組合命令就需要使用 Lua 才能保證原子性了。

看下如何實現

使用 set key value px milliseconds nx 實現

因為這個命令同時能夠設定鍵值和過期時間,同時Redis中的單命令都是原子性的,所以加鎖的時候使用這個命令即可

func (r *Redis) TryLock(ctx context.Context, key, value string, expire time.Duration) (isGetLock bool, err error) {
	// 使用 set nx
	res, err := r.Do(ctx, "set", key, value, "px", expire.Milliseconds(), "nx").Result()
	if err != nil {
		return false, err
	}
	if res == "OK" {
		return true, nil
	}
	return false, nil
}

SETNX+Lua 實現

如果使用 SETNX 命令,這個命令不能設定過期時間,需要配合 EXPIRE 命令來使用。

因為是用到了兩個命令,這時候兩個命令的組合使用是不能保障原子性的,在一些並行比較大的時候,需要配合使用 Lua 指令碼來保證命令的原子性。

func tryLockScript() string {
	script := `
		local key = KEYS[1]

		local value = ARGV[1] 
		local expireTime = ARGV[2] 
		local isSuccess = redis.call('SETNX', key, value)

		if isSuccess == 1 then
			redis.call('EXPIRE', key, expireTime)
			return "OK"
		end

		return "unLock"    `
	return script
}

func (r *Redis) TryLock(ctx context.Context, key, value string, expire time.Duration) (isGetLock bool, err error) {
	// 使用 Lua + SETNX
	res, err := r.Eval(ctx, tryLockScript(), []string{key}, value, expire.Seconds()).Result()
	if err != nil {
		return false, err
	}
	if res == "OK" {
		return true, nil
	}
	return false, nil
}

除了上面加鎖兩個命令的區別之外,在解鎖的時候需要注意下不能誤刪除別的執行緒持有的鎖

為什麼會出現這種情況呢,這裡來分析下

舉個栗子

1、執行緒1獲取了鎖,鎖的過期時間為1s;

2、執行緒1完成了業務操作,用時1.5s ,這時候執行緒1的鎖已經被過期時間自動釋放了,這把鎖已經被別的執行緒獲取了;

3、但是執行緒1不知道,接著去釋放鎖,這時候就會將別的執行緒的鎖,錯誤的釋放掉。

面對這種情況,其實也很好處理

1、設定 value 具有唯一性;

2、每次刪除鎖的時候,先去判斷下 value 的值是否能對的上,不相同就表示,鎖已經被別的執行緒獲取了;

看下程式碼實現

var UnLockErr = errors.New("未解鎖成功")
func unLockScript() string {
	script := `
		local value = ARGV[1] 
		local key = KEYS[1]
		local keyValue = redis.call('GET', key)
		if tostring(keyValue) == tostring(value) then
			return redis.call('DEL', key)
		else
			return 0
		end
    `
	return script
}
func (r *Redis) Unlock(ctx context.Context, key, value string) (bool, error) {
	res, err := r.Eval(ctx, unLockScript(), []string{key}, value).Result()
	if err != nil {
		return false, err
	}
	return res.(int64) != 0, nil
}

程式碼可參考lock

上面的這類鎖的最大缺點就是隻作用在一個節點上,即使 Redis 通過 sentinel 保證高可用,如果這個 master 節點由於某些原因放生了主從切換,那麼就會出現鎖丟失的情況:

1、在 Redis 的 master 節點上拿到了鎖;

2、但是這個加鎖的 key 還沒有同步到 slave 節點;

3、master 故障,發生了故障轉移,slave 節點升級為 master 節點;

4、導致鎖丟失。

針對這種情況如何處理呢,下面來聊聊 Redlock 演演算法

使用 Redlock 實現分散式鎖

在 Redis 的分散式環境中,我們假設有 N 個 Redis master。這些節點完全互相獨立,不存在主從複製或者其他叢集協調機制。我們確保將在 N 個範例上使用與在 Redis 單範例下相同方法獲取和釋放鎖。現在我們假設有 5 個 Redis master 節點,同時我們需要在5臺伺服器上面執行這些 Redis 範例,這樣保證他們不會同時都宕掉。

為了取到鎖,使用者端營該執行以下操作:

1、獲取當前Unix時間,以毫秒為單位。

2、依次嘗試從5個範例,使用相同的key和具有唯一性的 value(例如UUID)獲取鎖。當向 Redis 請求獲取鎖時,使用者端應該設定一個網路連線和響應超時時間,這個超時時間應該小於鎖的失效時間。例如你的鎖自動失效時間為10秒,則超時時間應該在 5-50 毫秒之間。這樣可以避免伺服器端 Redis 已經掛掉的情況下,使用者端還在死死地等待響應結果。如果伺服器端沒有在規定時間內響應,使用者端應該儘快嘗試去另外一個 Redis 範例請求獲取鎖;

3、使用者端使用當前時間減去開始獲取鎖時間(步驟1記錄的時間)就得到獲取鎖使用的時間。當且僅當從大多數(N/2+1,這裡是3個節點)的 Redis 節點都取到鎖,並且使用的時間小於鎖失效時間時,鎖才算獲取成功;

4、如果取到了鎖,key 的真正有效時間等於有效時間減去獲取鎖所使用的時間(步驟3計算的結果);

5、如果因為某些原因,獲取鎖失敗(沒有在至少N/2+1個 Redis 範例取到鎖或者取鎖時間已經超過了有效時間),使用者端應該在所有的Redis範例上進行解鎖(即便某些 Redis 範例根本就沒有加鎖成功,防止某些節點獲取到鎖但是使用者端沒有得到響應而導致接下來的一段時間不能被重新獲取鎖)。

根據官方的推薦,go 版本中 Redsync 實現了這一演演算法,這裡看下具體的實現過程

redsync專案地址

// LockContext locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
func (m *Mutex) LockContext(ctx context.Context) error {
	if ctx == nil {
		ctx = context.Background()
	}
	value, err := m.genValueFunc()
	if err != nil {
		return err
	}
	
	for i := 0; i < m.tries; i++ {
		if i != 0 {
			select {
			case <-ctx.Done():
				// Exit early if the context is done.
				return ErrFailed
			case <-time.After(m.delayFunc(i)):
				// Fall-through when the delay timer completes.
			}
		}
		start := time.Now()
		// 嘗試在所有的節點中加鎖
		n, err := func() (int, error) {
			ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
			defer cancel()
			return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
				// acquire 加鎖函數
				return m.acquire(ctx, pool, value)
			})
		}()
		if n == 0 && err != nil {
			return err
		}
		// 如果加鎖節點書沒有達到的設定的數目
		// 或者鍵值的過期時間已經到了
		// 在所有的節點中解鎖
		now := time.Now()
		until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
		if n >= m.quorum && now.Before(until) {
			m.value = value
			m.until = until
			return nil
		}
		_, err = func() (int, error) {
			ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
			defer cancel()
			return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
				// 解鎖函數
				return m.release(ctx, pool, value)
			})
		}()
		if i == m.tries-1 && err != nil {
			return err
		}
	}
	return ErrFailed
}
// 遍歷所有的節點,並且在每個節點中執行傳入的函數
func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) {
	type result struct {
		Status bool
		Err    error
	}
	ch := make(chan result)
	// 執行傳入的函數
	for _, pool := range m.pools {
		go func(pool redis.Pool) {
			r := result{}
			r.Status, r.Err = actFn(pool)
			ch <- r
		}(pool)
	}
	n := 0
	var err error
	// 計算執行成功的節點數目
	for range m.pools {
		r := <-ch
		if r.Status {
			n++
		} else if r.Err != nil {
			err = multierror.Append(err, r.Err)
		}
	}
	return n, err
}
// 手動解鎖的lua指令碼
var deleteScript = redis.NewScript(1, `
	if redis.call("GET", KEYS[1]) == ARGV[1] then
		return redis.call("DEL", KEYS[1])
	else
		return 0
	end
`)
// 手動解鎖
func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {
	conn, err := pool.Get(ctx)
	if err != nil {
		return false, err
	}
	defer conn.Close()
	status, err := conn.Eval(deleteScript, m.name, value)
	if err != nil {
		return false, err
	}
	return status != int64(0), nil
}

分析下思路

1、遍歷所有的節點,然後嘗試在所有的節點中執行加鎖的操作;

2、收集加鎖成功的節點數,如果沒有達到指定的數目,釋放剛剛新增的鎖;

關於 Redlock 的缺點可參見

How to do distributed locking

鎖的續租

Redis 中分散式鎖還有一個問題就是鎖的續租問題,當鎖的過期時間到了,但是業務的執行時間還沒有完成,這時候就需要對鎖進行續租了

續租的流程

1、當用戶端加鎖成功後,可以啟動一個定時的任務,每隔一段時間,檢查業務是否完成,未完成,增加 key 的過期時間;

2、這裡判斷業務是否完成的依據是:

  • 1、這個 key 是否存在,如果 key 不存在了,就表示業務已經執行完成了,也就不需要進行續租操作了;
  • 2、同時需要校驗下 value 值,如果 value 對應的值和之前寫入的值不同了,說明當前鎖已經被別的執行緒獲取了;

看下 redsync 中續租的實現

// Extend resets the mutex's expiry and returns the status of expiry extension.
func (m *Mutex) Extend() (bool, error) {
	return m.ExtendContext(nil)
}

// ExtendContext resets the mutex's expiry and returns the status of expiry extension.
func (m *Mutex) ExtendContext(ctx context.Context) (bool, error) {
	start := time.Now() 
	// 嘗試在所有的節點中加鎖
	n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
		return m.touch(ctx, pool, m.value, int(m.expiry/time.Millisecond))
	})
	if n < m.quorum {
		return false, err
	}
	// 判斷下鎖的過期時間
	now := time.Now()
	until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
	if now.Before(until) {
		m.until = until
		return true, nil
	}
	return false, ErrExtendFailed
}

var touchScript = redis.NewScript(1, `
	// 需要先比較下當前的value值
	if redis.call("GET", KEYS[1]) == ARGV[1] then
		return redis.call("PEXPIRE", KEYS[1], ARGV[2])
	else
		return 0
	end
`)

func (m *Mutex) touch(ctx context.Context, pool redis.Pool, value string, expiry int) (bool, error) {
	conn, err := pool.Get(ctx)
	if err != nil {
		return false, err
	}
	defer conn.Close()
	status, err := conn.Eval(touchScript, m.name, value, expiry)
	if err != nil {
		return false, err
	}
	return status != int64(0), nil
}

1、鎖的續租需要使用者端去監聽和操作,啟動一個定時器,固定時間來呼叫續租函數給鎖續租;

2、每次續租操作的時候需要匹配下當前的 value 值,因為鎖可能已經被當前的執行緒釋放了,當前的持有者可能是別的執行緒;

看看 SETEX 的原始碼

SETEX 能保證只有在 key 不存在時設定 key 的值,那麼這裡來看看,原始碼中是如何實現的呢

// https://github.com/redis/redis/blob/7.0/src/t_string.c#L78
// setGenericCommand()函數是以下命令: SET, SETEX, PSETEX, SETNX.的最底層實現
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
    ...

    found = (lookupKeyWrite(c->db,key) != NULL);
    // 這裡是 SETEX 實現的重點
	// 如果nx,並且在資料庫中找到了這個值就返回
	// 如果是 xx,並且在資料庫中沒有找到鍵值就會返回
	
	// 因為 Redis 中的命令執行都是單執行緒操作的
	// 所以命令中判斷如果存在就返回,能夠保證正確性,不會出現並行存取的問題
    if ((flags & OBJ_SET_NX && found) ||
        (flags & OBJ_SET_XX && !found))
    {
        if (!(flags & OBJ_SET_GET)) {
            addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
        }
        return;
    }

    ...
}

1、命令的實現裡面加入了鍵值是否存在的判斷,來保證 NX 只有在 key 不存在時設定 key 的值;

2、因為 Redis 中總是一個執行緒處理命令的執行,單命令是能夠保證原子性,不會出現並行的問題。

為什麼 Redis 可以用來做分散式鎖

分散式鎖需要滿足的特性

  • 互斥性:在任意時刻,對於同一個鎖,只有一個使用者端能持有,從而保證一個共用資源同一時間只能被一個使用者端操作;
  • 安全性:即不會形成死鎖,當一個使用者端在持有鎖的期間崩潰而沒有主動解鎖的情況下,其持有的鎖也能夠被正確釋放,並保證後續其它使用者端能加鎖;
  • 可用性:當提供鎖服務的節點發生宕機等不可恢復性故障時,“熱備” 節點能夠接替故障的節點繼續提供服務,並保證自身持有的資料與故障節點一致。
  • 對稱性:對於任意一個鎖,其加鎖和解鎖必須是同一個使用者端,即使用者端 A 不能把使用者端 B 加的鎖給解了。

那麼 Redis 對上面的特性是如何支援的呢?

1、Redis 中命令的執行都是單執行緒的,雖然在 Redis6.0 的版本中,引入了多執行緒來處理 IO 任務,但是命令的執行依舊是單執行緒處理的;

2、單執行緒的特點,能夠保證命令的執行的是不存在並行的問題,同時命令執行的原子性也能得到保證;

3、Redis 中提供了針對 SETNX 這樣的命令,能夠保證同一時刻是隻會有一個請求執行成功,提供互斥性的保障;

4、Redis 中也提供了 EXPIRE 超時釋放的命令,可以實現鎖的超時釋放,避免死鎖的出現;

5、高可用,針對如果發生主從切換,資料丟失的情況,Redis 引入了 RedLock 演演算法,保證了 Redis 中主要大部分節點正常執行,鎖就可以正常執行;

6、Redis 中本身沒有對鎖提供續期的操作,不過一些第三方的實現中實現了 Redis 中鎖的續期,類似 使用 java 實現的 Redisson,使用 go 實現的 redsync,當然自己實現也不是很難,實現過程可參見上文。

總體來說,Redis 中對分散式鎖的一些特性都提供了支援,使用 Redis 實現分散式鎖,是一個不錯的選擇。

分散式鎖如何選擇

1、如果業務規模不大,qps 很小,使用 Redis,etcd,ZooKeeper 去實現分散式鎖都不會有問題,就看公司了基礎架構了,如果有現成的 Redis,etcd,ZooKeeper 直接用就可以了;

2、Redis 中分散式鎖有一定的安全隱患,如果業務中對安全性要求很高,那麼 Redis 可能就不適合了,etcd 或者 ZooKeeper 就比較合適了;

3、如果系統 qps 很大,但是可以容忍一些錯誤,那麼 Redis 可能就更合適了,畢竟 etcd或者ZooKeeper 背面往往都是較低的吞吐量和較高的延遲。

總結

1、在分散式的場景下,使用分散式鎖是我們經常遇到的一種場景;

2、使用 Redis 實現鎖是個不錯的選擇,Redis 的單命令的執行是原子性的同時藉助於 Lua 也可以很容易的實現組合命令的原子性;

3、針對分散式場景下主從切換,資料同步不及時的情況,redis 中引入了 redLock 來處理分散式鎖;

4、根據 martin 的描述,redLock 是繁重的,且存在安全性,不過我們可以根據自己的業務場景做出判斷;

5、需要注意的是在設定分散式鎖的時候需要設定 value 的唯一性,並且每次主動刪除鎖的時候需要匹配下 value 的正確性,避免誤刪除其他執行緒的鎖;

參考

【Redis核心技術與實戰】https://time.geekbang.org/column/intro/100056701
【Redis設計與實現】https://book.douban.com/subject/25900156/
【Redis 的學習筆記】https://github.com/boilingfrog/Go-POINT/tree/master/redis
【Redis 分散式鎖】https://redis.io/docs/reference/patterns/distributed-locks/
【How to do distributed locking】https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html
【etcd 實現分散式鎖】https://www.cnblogs.com/ricklz/p/15033193.html
【Redis中的原子操作(3)-使用Redis實現分散式鎖】https://boilingfrog.github.io/2022/06/15/ Redis中的原子操作 (3)-使用Redis實現分散式鎖/

到此這篇關於使用Redis實現分散式鎖的方法的文章就介紹到這了,更多相關Redis分散式鎖內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com