首頁 > 軟體

基於Redis實現延時佇列的優化方案小結

2022-07-05 18:07:16

一、延時佇列的應用

近期在開發部門的新專案,其中有個關鍵功能就是智慧推播,即根據使用者行為在特定的時間點向用戶推播相應的提醒訊息,比如以下業務場景:

  • 在使用者點選充值項後,半小時內未充值,向用戶推播充值未完成提醒。
  • 在使用者最近一次閱讀行為2小時後,向用戶推播繼續閱讀提醒。
  • 在使用者新註冊或退出應用N分鐘後,向用戶推播合適的推薦訊息。

上述場景的共同特徵就是在某事件觸發後延遲一定時間後再執行特定任務,若事件觸發時間點可知,則上述邏輯也可等價於在指定時間點(事件觸發時間點+延遲時間長度)執行特定任務。實現這類需求一般採用延時佇列,其中建立的延時訊息中需要包含任務延遲時間或任務執行時間點等資訊,當任務滿足時間條件需要執行時,該訊息便會被消費,也就是說可以指定佇列中的訊息在哪個時間點被消費。

二、延時佇列的實現

在單機環境中,JDK已經自帶了很多能夠實現延時佇列功能的元件,比如DelayQueue, Timer, ScheduledExecutorService等元件,都可以較為簡便地建立延時任務,但上述元件使用一般需要把任務儲存在記憶體中,服務重啟存在任務丟失風險,且任務規模體量受記憶體限制,同時也造成長時間記憶體佔用,並不靈活,通常適用於單程序客服端程式中或對任務要求不高的專案中。

在分散式環境下,僅使用JDK自帶元件並不能可靠高效地實現延時佇列,通常需要引入第三方中介軟體或框架。

比如常見的經典任務排程框架Quartz或基於此框架的xxl-job等其它框架,這些框架的主要功能是實現定時任務或週期性任務,在Redis、RabbitMQ還未廣泛應用時,譬如常見的超時未支付取消訂單等功能都是由定時任務實現的,通過定時輪詢來判斷是否已到達觸發執行的時間點。但由於定時任務需要一定的週期性,週期掃描的間隔時間不好控制,太短會造成很多無意義的掃描,且增大系統壓力,太長又會造成執行時間誤差太大,且可能造成單次掃描所處理的堆積記錄數量過大。

此外,利用MQ做延時佇列也是一種常見的方式,比如通過RabbitMQ的TTL和死信佇列實現訊息的延遲投遞,考慮到投遞出去的MQ訊息無法方便地實現刪除或修改,即無法實現任務的取消或任務執行時間點的更改,同時也不能方便地對訊息進行去重,因此在專案中並未選擇使用MQ實現延時佇列。

Redis的資料結構zset,同樣可以實現延遲佇列的效果,且更加靈活,可以實現MQ無法做到的一些特性,因此專案最終採用Redis實現延時佇列,並對其進行優化與封裝。

實現原理是利用zset的score屬性,redis會將zset集合中的元素按照score進行從小到大排序,通過zadd命令向zset中新增元素,如下述命令所示,其中value值為延時任務訊息,可根據業務定義訊息格式,score值為任務執行的時間點,比如13位毫秒時間戳。

zadd delayqueue 1614608094000 taskinfo

任務新增後,獲取任務的邏輯只需從zset中篩選score值小於當前時間戳的元素,所得結果便是當前時間節點下需要執行的任務,通過zrangebyscore命令來獲取,如下述命令所示,其中timestamp為當前時間戳,可用limit限制每次拉取的記錄數,防止單次獲取記錄數過大。

zrangebyscore delayqueue 0 timestamp limit 0 1000

在實際實現過程中,從zset中獲取到當前需要執行的任務後,需要先確保將任務對應的元素從zset中刪除,刪除成功後才允許執行任務邏輯,這樣是為了在分散式環境下,當存在多個執行緒獲取到同一任務後,利用redis刪除操作的原子性,確保只有一個執行緒能夠刪除成功並執行任務,防止重複執行。實際任務的執行通常會再將其傳送至MQ非同步處理,將“獲取任務”與“執行任務”兩者分離解耦,更加靈活,“獲取任務”只負責拿到當前時間需要執行的任務,並不真正執行任務業務邏輯,因此只需相對少量的執行執行緒即可,而實際的任務執行邏輯則由MQ消費者承擔,方便調控負載能力。整體過程如下圖所示。

採用zset做延時佇列的另一個好處是可以實現任務的取消和任務執行時間點的更改,只需要將任務資訊從zset中刪除,便可取消任務,同時由於zset擁有集合去重的特性,只需再次寫入同一個任務資訊,但是value值設定為不同的執行時間點,便可更改任務執行時間,實現單個任務執行時間的動態調整。

瞭解實現原理後,再進行具體程式設計實現。建立延時任務較為簡便,準備好任務訊息和執行時間點,寫入zset即可。獲取延時任務最簡單的方案是通過定時任務,週期性地執行上述邏輯,如下程式碼所示。

@XxlScheduled(cron = "0/5 * * * * ?", name = "scan business1 delayqueue")
public void scanBusiness1() {
    // 某業務邏輯的zset延遲佇列對應的key
    String zsetKey = "delayqueue:business1";
    while (true) {
        // 篩選score值小於當前時間戳的元素,一次最多拉取1000條
        Set<String> tasks = stringRedisTemplate.opsForZSet().rangeByScore(zsetKey, 0, System.currentTimeMillis(), 0, 1000);
        if (CollectionUtils.isEmpty(tasks)) {
            // 當前時間下已沒有需要執行的任務,結束本次掃描
            return;
        }
        for (String task : tasks) {
            // 先刪除,再執行,確保多執行緒環境下執行的唯一性
            Boolean delete = stringRedisTemplate.delete(task);
            if (delete) {
                // 刪除成功後,將其再傳送到指定MQ非同步處理,將「獲取任務」與「執行任務」分離解耦
                rabbitTemplate.convertAndSend("exchange_business1", "routekey_business1", task);
            }
        }
    }
}

上述方案使用xxl-job做分散式定時任務,間隔5秒執行一次,程式碼藉助spring提供的api來完成redis和MQ的操作。由於是分散式定時任務,每次執行只有一個執行緒在獲取任務,機器利用率低,當資料規模較大時,單靠一個執行緒無法滿足吞吐量要求,因此這種方案只適用於小規模資料量級別。此處間隔時間也可適當調整,例如縮短為1秒,調整所需考慮原則在上文已提到:間隔太短會造成很多無意義的掃描,且增大系統壓力,太長又會造成執行時間誤差太大。

為了提升整體吞吐量,考慮不使用分散式定時任務,對叢集內每臺機器(或範例)均設定獨立的定時任務,同時採用多個zset佇列,以數位字尾區分。假設有M個zset佇列,建立延時訊息時選取訊息的某個ID欄位,計算hash值再對M取餘,根據餘數決定傳送到對應數位字尾的zset佇列中(分散訊息,此處ID欄位選取需要考慮做到均勻分佈,不要造成資料傾斜)。佇列數量M的選取需要考慮機器數量N,理想情況下有多少臺機器就定義多少個佇列,保持M與N基本相等即可。因為佇列太少,會造成機器對佇列的競爭存取處理,佇列太多又會導致任務得不到及時的處理。最佳實踐是佇列數量可動態設定,如採用分散式設定中心,這樣當叢集機器數量變化時,可以相應調整佇列數量。

每臺機器在觸發定時任務時,需要通過適當的負載均衡來決定從哪個佇列拉取訊息,負載均衡的好壞也會影響整個叢集的效率,如果負載分佈不均可能會導致多臺機器競爭處理同一佇列,降低效率。一個簡單實用的做法是利用redis的自增操作再對佇列數量取餘即可,只要保持佇列數量和機器數量基本相等,這種做法在很大程度上就可以保證不會有多臺機器競爭同一佇列。至於每臺機器從對應zset中的任務獲取邏輯,仍然和前面程式碼一致。以上方式簡化實現程式碼如下所示。

@Scheduled(cron = "0/5 * * * * ?")
public void scanBusiness1() {
    // 佇列數量M,考慮動態設定,保持和機器數量基本一致
    int M = 10;
    // redis自增key,用於負載均衡
    String incrKey = "incrkey:delayqueue:business1";
    // 每臺機器執行時,從不同的zset中拉取訊息,儘量確保不同機器存取不同zset
    String zsetKey = "delayqueue:business1:" + (stringRedisTemplate.opsForValue().increment(incrKey) % M);
    while (true) {
        // 此處邏輯和前面程式碼一致,省略。。。
    }
}

上述方案和第一種方案的主要的不同點在於zsetKey的獲取上,這裡是根據負載均衡演演算法算出來的,確保每臺機器存取不同zset並拉取訊息,同時定時任務採用spring提供的程序內註解@Scheduled,叢集內每臺機器都會間隔5秒執行,因此相比之前的方案,能夠較為明顯地提升整個叢集的吞吐量。但是這種方案的步驟相對更為複雜,需要動態設定佇列數量,同時在建立延時任務時需要選擇合適的訊息ID欄位來決定傳送的目標zset佇列,此處還要考慮均勻分佈,整體實現要考慮的因素較多。

上面一種方案已經能夠較好地滿足整體吞吐量要求,但其缺點是步驟相對複雜,因此專案中沒有采用這種方案,而是採用下面一種也能滿足吞吐量要求,步驟相對簡單,又方便通用化的方案。

該方案不使用定時任務,而是單獨啟動後臺執行緒,線上程中執行永久迴圈,每次迴圈邏輯為:從目標zset中獲取score值小於當前時間戳的元素集合中的score最小的那個元素,相當於獲取當前時間點需要執行且執行時間點最早的那個任務,如果獲取不到,表示當前時間點下暫無需要執行的任務,則執行緒休眠100ms(可視情況調整),否則,對獲取到的元素進行處理,在分散式多執行緒環境下,仍然需要先刪除成功才能進行處理。此外,考慮到每個執行緒獲取元素後都需要再次存取redis嘗試刪除操作,為了避免多執行緒爭搶浪費資源,降低效率,這裡採用lua指令碼將獲取和刪除操作原子化。lua指令碼邏輯程式碼如下所示。

local zsetKey = 'delayqueue'
local timestamp = 1614608094000
local items = redis.call('zrangebyscore',zsetKey,0,timestamp,'limit',0,1)
if #items == 0 then
    return ''
else
    redis.call('zremrangebyrank',zsetKey,0,0)
    return items[1]
end

其中timestamp為當前時間戳,通過在zrangebyscore命令中指定limit為1來獲取score最小的元素,若獲取不到,即結果集長度為0,則返回空字串,否則,通過zremrangebyrank命令刪除頭部元素,即score最小的元素,也就是之前獲取到的那個元素,由於redis內部保證lua指令碼的原子性,上述獲取並刪除的操作能夠執行無誤。具體JAVA實現中還對其進行了多執行緒操作的封裝和通用化的抽象,使不同業務都能夠使用該元件實現延時佇列。具體實現程式碼如下所示。

/**
 * 基於ZSET實現訊息延遲處理,score儲存執行時間點,到達時間點即會向指定佇列傳送該訊息;
 * 定義一個繼承本類的bean即可;
 */
public abstract class AbstractDelayedMsgScanTrigger implements Runnable, DisposableBean {

    private static final RedisScript<String> TRY_GET_AND_DEL_SCRIPT;
    static {
        // 獲取並刪除的lua指令碼,使用spring提供的api
        String sb = "local items = redis.call('zrangebyscore',KEYS[1],0,ARGV[1],'limit',0,1)n" +
                "if #items == 0 thenn" +
                "treturn ''n" +
                "elsen" +
                "tredis.call('zremrangebyrank',KEYS[1],0,0)n" +
                "treturn items[1]n" +
                "end";
        // 自有工具類,只要能建立出spring包下的 RedisScript 的實現類物件均可
        TRY_GET_AND_DEL_SCRIPT = RedisScriptHelper.createScript(sb, String.class);
    }

    private final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(getThreadNum(), getThreadNum(),
            0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory(getThreadNamePrefix()));
    private volatile boolean quit = false;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void startScan() {
        // bean構建完成後,啟動若干執行執行緒
        int threadNum = getThreadNum();
        for (int i = 0; i < threadNum; i++) {
            EXECUTOR.execute(this);
        }
    }

    @Override
    public void run() {
        while (!quit) {
            try {
                // 迴圈,採用lua獲取當前需要執行的任務並將其從redis中刪除
                String msg = stringRedisTemplate.execute(TRY_GET_AND_DEL_SCRIPT,
                        Lists.newArrayList(getDelayedMsgSourceKey()), String.valueOf(System.currentTimeMillis()));
                if (StringUtils.isNotBlank(msg)) {
                    // 訊息不為空,表示獲取任務成功,將其再傳送到指定MQ非同步處理,將「獲取任務」與「執行任務」分離解耦
                    rabbitTemplate.convertAndSend(getSendExchange(), getSendRoutingKey(), msg);
                } else {
                    // 獲取不到任務,表示當前時間點下暫無需要執行的任務,則執行緒休眠1S(可視情況調整)
                    SleepUtils.sleepSeconds(1);
                }
            } catch (Exception e) {
                Logs.MSG.error("delayed msg scan error, sourceKey:{}", getDelayedMsgSourceKey(), e);
            }
        }
    }

    @Override
    public void destroy() throws Exception {
        quit = true;
    }

    public void setQuit(boolean quit) {
        this.quit = quit;
    }

    /**
     * 獲取訊息的工作執行緒數量
     */
    protected abstract int getThreadNum();

    /**
     * 執行緒名稱字首,方便問題定位
     */
    protected abstract String getThreadNamePrefix();

    /**
     * 存放延遲訊息的ZSET佇列名
     */
    protected abstract String getDelayedMsgSourceKey();

    /**
     * 訊息到達執行時間點時將其通過指定 exchange 傳送到實時消費佇列中
     */
    protected abstract String getSendExchange();

    /**
     * 訊息到達執行時間點時將其通過指定 routingKey 傳送到實時消費佇列中
     */
    protected abstract String getSendRoutingKey();

}

在具體業務應用中,只需定義一個繼承上述類的bean即可,需要實現的方法主要是提供一些設定,比如該業務對應的zset延時佇列名稱,同時工作拉取訊息的執行緒數量,由於採用rabbitMq,因此這裡需要提供exchange和routingKey。實際使用中只需向該zset佇列中新增訊息,並將score設為該任務需要執行的時間點(此處為13位毫秒時間戳),則到該時間點後,上述元件便會將該訊息從zset中取出並刪除,再將其通過指定的路由傳送到實時MQ消費佇列中,由消費者負責執行任務業務邏輯。目前該元件在專案中正常平穩執行。

三、總結

本文結合專案中的實際需求介紹了延時佇列的應用場景,分析了延時佇列的多種實現,重點講述了利用redis實現延時佇列的原理,對其實現方案進行比較與優化,並將最終方案實際運用於專案需求中。

到此這篇關於基於Redis實現延時佇列的優化方案小結的文章就介紹到這了,更多相關Redis 延時佇列內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com