<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
在之前探討延時佇列的文章中我們提到了 redisson delayqueue 使用 redis 的有序集合結構實現延時佇列,遺憾的是 go 語言社群中並無類似的庫。不過問題不大,沒有輪子我們自己造。
本文的完整程式碼實現在hdt3213/delayqueue,可以直接 go get 安裝使用。
使用有序集合結構實現延時佇列的方法已經廣為人知,無非是將訊息作為有序集合的 member, 投遞時間戳作為 score 使用 zrangebyscore 命令搜尋已到投遞時間的訊息然後將其發給消費者。
然而訊息佇列不是將訊息發給消費者就萬事大吉,它們還有一個重要職責是確保送達和消費。通常的實現方式是當消費者收到訊息後向訊息佇列返回確認(ack),若消費者返回否定確認(nack)或超時未返回,訊息佇列則會按照預定規則重新傳送,直到到達最大重試次數後停止。如何實現 ack 和重試機制是我們要重點考慮的問題。
我們的訊息佇列允許分散式地部署多個生產者和消費者,消費者範例定時執行 lua 指令碼驅動訊息在佇列中的流轉無需部署額外元件。由於 Redis 保證了 lua 指令碼執行的原子性,整個流程無需加鎖。
消費者採用拉模式獲得訊息,保證每條訊息至少投遞一次,訊息佇列會重試超時或者被否定確認的訊息(nack) 直至到達最大重試次數。一條訊息最多有一個消費者正在處理,減少了要考慮的並行問題。
請注意:若消費時間超過了 MaxConsumeDuration 訊息佇列會認為消費超時並重新投遞,此時可能有多個消費者同時消費。
具體使用也非常簡單,只需要註冊處理訊息的回撥函數並呼叫 start() 即可:
package main import ( "github.com/go-redis/redis/v8" "github.com/hdt3213/delayqueue" "strconv" "time" ) func main() { redisCli := redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", }) queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool { // 註冊處理訊息的回撥函數 // 返回 true 表示已成功消費,返回 false 訊息佇列會重新投遞次訊息 return true }) // 傳送延時訊息 for i := 0; i < 10; i++ { err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3)) if err != nil { panic(err) } } // start consume done := queue.StartConsume() <-done }
由於資料儲存在 redis 中所以我們最多能保證在 redis 無故障且訊息佇列相關 key 未被外部篡改的情況下不會丟失訊息。
訊息佇列涉及幾個關鍵的 redis 資料結構:
流程如下圖所示:
由於我們允許分散式地部署多個消費者,每個消費者都在定時執行 lua 指令碼,所以多個消費者可能處於上述流程中不同狀態,我們無法預知(或控制)上圖中五個操作發生的先後順序,也無法控制有多少範例正在執行同一個操作。
因此我們需要保證上圖中五個操作滿足三個條件:
只要滿足這三個條件,我們就可以部署多個範例且不需要使用分散式鎖等技術來進行狀態同步。
是不是聽起來有點嚇人?其實簡單的很,讓我們一起來詳細看看吧~
pending2ReadyScript 使用 zrangebyscore 掃描已到投遞時間的訊息ID並把它們移動到 ready 中:
-- keys: pendingKey, readyKey -- argv: currentTime local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- 從 pending key 中找出已到投遞時間的訊息 if (#msgs == 0) then return end local args2 = {'LPush', KEYS[2]} -- 將他們放入 ready key 中 for _,v in ipairs(msgs) do table.insert(args2, v) end redis.call(unpack(args2)) redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- 從 pending key 中刪除已投遞的訊息
ready2UnackScript 從 ready 或者 retry 中取出一條訊息傳送給消費者並放入 unack 中,類似於 RPopLPush:
-- keys: readyKey/retryKey, unackKey -- argv: retryTime local msg = redis.call('RPop', KEYS[1]) if (not msg) then return end redis.call('ZAdd', KEYS[2], ARGV[1], msg) return msg
unack2RetryScript 從 retry 中找出所有已到重試時間的訊息並把它們移動到 unack 中:
-- keys: unackKey, retryCountKey, retryKey, garbageKey -- argv: currentTime local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- 找到已到重試時間的訊息 if (#msgs == 0) then return end local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- 查詢剩餘重試次數 for i,v in ipairs(retryCounts) do local k = msgs[i] if tonumber(v) > 0 then -- 剩餘次數大於 0 redis.call("HIncrBy", KEYS[2], k, -1) -- 減少剩餘重試次數 redis.call("LPush", KEYS[3], k) -- 新增到 retry key 中 else -- 剩餘重試次數為 0 redis.call("HDel", KEYS[2], k) -- 刪除重試次數記錄 redis.call("SAdd", KEYS[4], k) -- 新增到垃圾桶,等待後續刪除 end end redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- 將已處理的訊息從 unack key 中刪除
因為 redis 要求 lua 指令碼必須在執行前在 KEYS 引數中宣告自己要存取的 key, 而我們將每個 msg 有一個獨立的 key,我們在執行 unack2RetryScript 之前是不知道哪些 msg key 需要被刪除。所以 lua 指令碼只將需要刪除的訊息記在 garbage key 中,指令碼執行完後再通過 del 命令將他們刪除:
func (q *DelayQueue) garbageCollect() error { ctx := context.Background() msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result() if err != nil { return fmt.Errorf("smembers failed: %v", err) } if len(msgIds) == 0 { return nil } // allow concurrent clean msgKeys := make([]string, 0, len(msgIds)) for _, idStr := range msgIds { msgKeys = append(msgKeys, q.genMsgKey(idStr)) } err = q.redisCli.Del(ctx, msgKeys...).Err() if err != nil && err != redis.Nil { return fmt.Errorf("del msgs failed: %v", err) } err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err() if err != nil && err != redis.Nil { return fmt.Errorf("remove from garbage key failed: %v", err) } return nil }
之前提到的 lua 指令碼都是原子性執行的,不會有其它命令插入其中。 gc 函數由 3 條 redis 命令組成,在執行過程中可能會有其它命令插入執行過程中,不過考慮到一條訊息進入垃圾回收流程之後不會復活所以不需要保證 3 條命令原子性。
ack 只需要將訊息徹底刪除即可:
func (q *DelayQueue) ack(idStr string) error { ctx := context.Background() err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err() if err != nil { return fmt.Errorf("remove from unack failed: %v", err) } // msg key has ttl, ignore result of delete _ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err() q.redisCli.HDel(ctx, q.retryCountKey, idStr) return nil }
否定確認只需要將 unack key 中訊息的重試時間改為現在,隨後執行的 unack2RetryScript 會立即將它移動到 retry key
func (q *DelayQueue) nack(idStr string) error { ctx := context.Background() // update retry time as now, unack2Retry will move it to retry immediately err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{ Member: idStr, Score: float64(time.Now().Unix()), }).Err() if err != nil { return fmt.Errorf("negative ack failed: %v", err) } return nil }
訊息佇列的核心邏輯是每秒執行一次的 consume 函數,它負責呼叫上述指令碼將訊息轉移到正確的集合中並回撥 consumer 來消費訊息:
func (q *DelayQueue) consume() error { // 執行 pending2ready,將已到時間的訊息轉移到 ready err := q.pending2Ready() if err != nil { return err } // 迴圈呼叫 ready2Unack 拉取訊息進行消費 var fetchCount uint for { idStr, err := q.ready2Unack() if err == redis.Nil { // consumed all break } if err != nil { return err } fetchCount++ ack, err := q.callback(idStr) if err != nil { return err } if ack { err = q.ack(idStr) } else { err = q.nack(idStr) } if err != nil { return err } if fetchCount >= q.fetchLimit { break } } // 將 nack 或超時的訊息放入重試佇列 err = q.unack2Retry() if err != nil { return err } // 清理已達到最大重試次數的訊息 err = q.garbageCollect() if err != nil { return err } // 消費重試佇列 fetchCount = 0 for { idStr, err := q.retry2Unack() if err == redis.Nil { // consumed all break } if err != nil { return err } fetchCount++ ack, err := q.callback(idStr) if err != nil { return err } if ack { err = q.ack(idStr) } else { err = q.nack(idStr) } if err != nil { return err } if fetchCount >= q.fetchLimit { break } } return nil }
至此一個簡單可靠的延時佇列就做好了,何不趕緊開始試用呢?
以上就是Golang實現基於Redis的可靠延遲佇列的詳細內容,更多關於Golang Redis可靠延遲佇列的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45