首頁 > 軟體

Redisson 分散式延時佇列 RedissonDelayedQueue 執行流程

2022-09-30 14:02:03

前言

因為工作中需要用到分散式的延時佇列,調研了一段時間,選擇使用 RedissonDelayedQueue,為了搞清楚內部執行流程,特記錄下來。

總體流程大概是圖中的這個樣子,初看一眼有點不知從何下手,接下來我會通過以下幾點來分析流程,相信看完本文你能瞭解整個執行流程。

  • 基本使用
  • 內部資料結構介紹
  • 基本流程
  • 傳送延時訊息
  • 獲取延時訊息
  • 初始化延時佇列

基本使用

傳送延遲訊息程式碼如下,傳送了一條延遲時間為 5s 的訊息。

    public void produce() {
        String queuename = "delay-queue";
        RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);
        RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
        delayedQueue.offer("測試延遲訊息", 5, TimeUnit.SECONDS);
    }

接收訊息程式碼如下,可以看到 delayedQueue 是沒有用到的,那麼為什麼要加這一行呢,這個後面總結部分回答。

    public void consume() throws InterruptedException {
        String queuename = "delay-queue";
        RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);
        RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
        String msg = blockingQueue.take();
        //收到訊息進行處理...
    }

這兩段程式碼可以寫在兩個不同的 Java 工程裡,只要連線的是同一個 Redis 就行。

呼叫 comsume() 之後,如果佇列裡沒有訊息,會阻塞等待佇列裡有訊息並且取到了才會返回。之所以這麼說是因為可能有別的 Java 程序也在跟你一樣取同一個佇列裡的訊息,如果訊息被另一個搶完了,那這時就還得阻塞等待。

這時看上去的原理是這樣的:

生產者呼叫 offer() 後,自己內部開啟一個定時器,等到了時間在傳送到 redis 的 list 裡。

如果是這樣設計的話,相信大家都能看出來一個很簡單的問題,要是延時時間還沒到,生產者自己掛了,那樣訊息就丟了。所以,還是讓我們接著往下看。

內部資料結構介紹

redisson 原始碼裡一共建立了三個佇列:【訊息延時佇列】、【訊息順序佇列】、【訊息目標佇列】。

假設在同一時間按照 msg1、msg2、msg3 的順序發訊息到延時佇列,這三條訊息就會被儲存在【訊息延時佇列】和【訊息順序佇列】。

可以看到【訊息延時佇列】的順序是按照到期時間升序排列的,而不是像【訊息順序佇列】按照插入順序排。

訊息到期後會將訊息從前兩個佇列移除(怎麼移?誰來移?),插入【訊息目標佇列】,也就是圖中第三個佇列。

消費者也是阻塞在【訊息目標佇列】上取訊息。

這時可以簡單說明下每個佇列的作用:

  • 【訊息延時佇列】利用按照到期時間排序的特性,可以很快找到下一個要到期的訊息,使用者端內部自己定時到
  • 【訊息目標佇列】取
  • 【訊息順序佇列】這個佇列對分析的流程關聯不大,可以忽略
  • 【訊息目標佇列】存放到期的訊息,供消費端取

其實【訊息延時佇列】佇列裡存的時間(也就是 zet 的 score)是到期的時間戳,為了畫圖方便,圖裡就畫的是延遲的時間,不過不影響理解。

理解好這幾個佇列的名字和作用,後面還會一直用到,如果忘了可以翻回來回顧下。

因為書寫理解方便和【訊息順序佇列】在本文沒涉及到,後面部分好幾次提到的內容:把到期的訊息從【訊息延時佇列】移到【訊息目標佇列】裡,這句話實際的程式碼邏輯是這樣:把【訊息延時佇列】和【訊息順序佇列】裡的到期訊息移除,把它們插入到【訊息目標佇列】。

基本流程

知道了內部所使用到的資料結構後,這裡可以簡單說下整體的基本流程。

先說傳送延遲訊息,傳送的延遲訊息會先存在【訊息延時佇列】和【訊息順序佇列】,如果【訊息延時佇列】原本是空的,會發布訂閱資訊提醒有新的訊息。

獲取延遲訊息只需要從【訊息目標佇列】阻塞的取就行了,因為裡面都是到期資料。

那麼問題就只剩下怎麼樣判斷時間到了,把【訊息延時佇列】裡的訊息移動到【訊息目標佇列】裡呢?

這部分工作交給了初始化延時佇列來處理。

這裡面會定時從【訊息延時佇列】查詢最新到期時間,定時去把【訊息延時佇列】裡的訊息移動到【訊息目標佇列】裡。

如果【訊息延時佇列】是空的,就不會再定時查,而是等待發布訂閱資訊提醒,再定時把【訊息延時佇列】裡的訊息移動到【訊息目標佇列】裡。

剛開始看可能有點抽象,可以看完底下一節內容之後,再回頭來看這裡對應的流程總結,可能會比較清晰。

傳送延時訊息

傳送延時訊息的邏輯比較簡單,先看下傳送的程式碼。

    public void produce() {
        String queuename = "delay-queue";
        RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);
        RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
        delayedQueue.offer("測試延遲訊息", 5, TimeUnit.SECONDS);
    }

從 delayedQueue.offer 方法開始,最終會執行到 RedissonDelayedQueue 的 offerAsync 方法裡。

offerAsync 方法的作用就是傳送一段指令碼給 redis 執行,指令碼內容是:

  • 將訊息和到期時間插入【訊息延時佇列】和【訊息順序佇列】
  • 如果最近到期的訊息是剛剛插入的訊息,則對指定主題釋出到期時間,目的是為了讓使用者端定時去把【訊息延時佇列】裡的到期資料移動到【訊息目標佇列】
    @Override
    public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
        if (delay < 0) {
            throw new IllegalArgumentException("Delay can't be negative");
        }
        
        long delayInMs = timeUnit.toMillis(delay);
        long timeout = System.currentTimeMillis() + delayInMs;
     
        long randomId = ThreadLocalRandom.current().nextLong();
        return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
                "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" 
              + "redis.call('zadd', KEYS[2], ARGV[1], value);"
              + "redis.call('rpush', KEYS[3], value);"
              // if new object added to queue head when publish its startTime 
              // to all scheduler workers 
              + "local v = redis.call('zrange', KEYS[2], 0, 0); "
              + "if v[1] == value then "
                 + "redis.call('publish', KEYS[4], ARGV[1]); "
              + "end;",
              Arrays.<Object>asList(getRawName(), timeoutSetName, queueName, channelName),
              timeout, randomId, encode(e));
    }

獲取延時訊息

獲取延時訊息是本文最簡單的一部分。

    public void consume() throws InterruptedException {
        String queuename = "delay-queue";
        RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);
        RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
        String msg = blockingQueue.take();
        //收到訊息進行處理...
    }

blockingQueue.take() 方法其實只是對【訊息目標佇列】執行 blpop 阻塞的獲取到期訊息

初始化延時佇列

看一下初始化的程式碼。

public void init() {
    String queuename = "delay-queue";
    RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);
    RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
}

入口就是在 redissonClient.getDelayedQueue(blockingQueue) 中,建立了 RedissonDelayedQueue 物件,並執行了構造方法裡的邏輯。

那麼這裡面主要做了什麼事呢?

主要是呼叫了 QueueTransferTask 的 start() 方法。

    public void start() {
        RTopic schedulerTopic = getTopic();
        statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {
            @Override
            public void onSubscribe(String channel) {
                pushTask();
            }
        });
        
        messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() {
            @Override
            public void onMessage(CharSequence channel, Long startTime) {
                scheduleTask(startTime);
            }
        });
    }

這段程式碼主要是設定了指定主題(主題名:redisson_delay_queue_channel:{queuename})兩個釋出訂閱的監聽器。

  • 當指定主題有新訂閱時呼叫 pushTask() 方法,裡面又會呼叫 pushTaskAsync() 方法
  • 當指定主題有新訊息時呼叫 scheduleTask(startTime) 方法

需要注意的是,這裡會先訂閱指定主題,然後觸發執行 onSubscribe() 方法。

所以我們主要搞懂這三個方法都是做什麼的,那麼整個初始化流程就明白了。

因為這三個方法是相互呼叫的,只看文字的話容易雲裡霧裡,這裡有個流程圖,看方法解釋文字的時候可以對照著流程圖看比較有印象。

scheduleTask()

這個方法看起來多,但核心內容就是根據方法引數指定的時間呼叫 pushTask()。

    private void scheduleTask(final Long startTime) {
        TimeoutTask oldTimeout = lastTimeout.get();
        if (startTime == null) {
            return;
        }

        if (oldTimeout != null) {
            oldTimeout.getTask().cancel();
        }

        long delay = startTime - System.currentTimeMillis();
        if (delay > 10) {
            Timeout timeout = connectionManager.newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) throws Exception {
                    pushTask();

                    TimeoutTask currentTimeout = lastTimeout.get();
                    if (currentTimeout.getTask() == timeout) {
                        lastTimeout.compareAndSet(currentTimeout, null);
                    }
                }
            }, delay, TimeUnit.MILLISECONDS);
            if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
                timeout.cancel();
            }
        } else {
            pushTask();
        }
    }

pushTaskAsync()

這個方法是抽象方法,在建立 RedissonDelayedQueue 物件的時候傳進來的,程式碼如下:

    @Override
    protected RFuture<Long> pushTaskAsync() {
        return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
                        + "if #expiredValues > 0 then "
                        + "for i, v in ipairs(expiredValues) do "
                        + "local randomId, value = struct.unpack('dLc0', v);"
                        + "redis.call('rpush', KEYS[1], value);"
                        + "redis.call('lrem', KEYS[3], 1, v);"
                        + "end; "
                        + "redis.call('zrem', KEYS[2], unpack(expiredValues));"
                        + "end; "
                        // get startTime from scheduler queue head task
                        + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
                        + "if v[1] ~= nil then "
                        + "return v[2]; "
                        + "end "
                        + "return nil;",
                Arrays.<Object>asList(getRawName(), timeoutSetName, queueName),
                System.currentTimeMillis(), 100);
    }

看不懂也不要緊,聽我解釋下就明白了。

這裡傳送了一段指令碼給 redis 執行:

  • 從【訊息延時佇列】取出前一百條到期的訊息,如果有的話,新增到【訊息目標佇列】裡,並將這些訊息從【訊息延時佇列】和【訊息順序佇列】中移除
  • 從【訊息延時佇列】取出下一條要到期的訊息,返回它的到期時間戳(如果佇列裡沒訊息返回空)。

我的理解就是初始化的時候

1是為了處理舊的訊息,比如生產者1傳送了訊息,然後時間沒到自己下線了,這時如果沒有其他使用者端線上,就沒有人能把資料從【訊息目標佇列】移到【訊息目標佇列】了。

2是返回的這個時間戳,會拿這個定時,等時間到了去【訊息目標佇列】拉去到期的訊息。

簡單總結就是這個方法是把到期訊息從【訊息延時佇列】放到【訊息目標佇列】裡,並且返回了最近要到期訊息的時間戳。

pushTask()

    private void pushTask() {
        RFuture<Long> startTimeFuture = pushTaskAsync();
        startTimeFuture.whenComplete((res, e) -> {
            if (e != null) {
                if (e instanceof RedissonShutdownException) {
                    return;
                }
                log.error(e.getMessage(), e);
                scheduleTask(System.currentTimeMillis() + 5 * 1000L);
                return;
            }

            if (res != null) {
                scheduleTask(res);
            }
        });
    }

這個程式碼看起來就比較簡單,呼叫了 pushTaskAsync() 獲取最近要到期訊息的時間戳(非同步封裝了一下)。

有異常的話就呼叫 scheduleTask() 五秒後再執行一次 pushTask()。

沒有異常的話如果有最近要到期訊息的時間戳(說明【訊息延時佇列】裡還有未到期訊息),用這個最新到期時間呼叫 scheduleTask(),在這個指定的時間呼叫 pushTask()。

這個方法簡單總結就是決定了要不要呼叫、什麼時候再呼叫 pushTask(),主要操作邏輯都在 pushTaskAsync() 裡(把到期的訊息從【訊息延時佇列】移到【訊息目標佇列】供消費端消費)。

瞭解了上面幾個方法的流程和含義,還記得一開頭提到的新增了兩個釋出訂閱的監聽器嗎?

1.當指定主題有新訂閱時呼叫 pushTask() 方法,裡面又會呼叫 pushTaskAsync() 方法

2.當指定主題有新訊息時呼叫 scheduleTask(startTime) 方法

需要注意的是,這裡會先訂閱指定主題,然後觸發執行 onSubscribe() 方法

在初始化延時佇列剛啟動的時候,處理到期舊資料:把到期的訊息從【訊息延時佇列】移到【訊息目標佇列】供消費端消費;處理新資料:獲取下次到期時間決定下次呼叫 pushTask() 的時間。

上面講的這種情況是站在當前使用者端的視角,但畢竟這是監聽訂閱資訊,如果啟動不止一個使用者端的話(就算是1個生產者1個消費者,也算兩個使用者端),總有一個使用者端的訂閱資訊回撥函數,會不會有問題?

仔細想想是沒有的,處理到期舊資料:之前啟動的使用者端已經處理完了;處理新資料:獲取最近到期時間,在 scheduleTask() 裡,如果之前有正在定時的任務,會把原來正在定時的任務取消掉。這個被取消的任務,時間要麼就是當前這個時間,要嘛是之後的時間,取消掉不會影響邏輯。

為了應對原本【訊息延時佇列】裡沒訊息了這種情況,流程結束了,重啟定時去呼叫 pushTask() ,把到期的訊息從【訊息延時佇列】移到【訊息目標佇列】供消費端消費。

總結

再放一下開頭的圖總體流程圖:

1.初始化延時佇列時會把【訊息延時佇列】裡的到期資料移動到【訊息目標佇列】,沒有也有可能;然後是找最近要到期的訊息時間,定時去拉,這個剛啟動也是可能沒有的,不過不要緊,這兩步是為了處理滯留在【訊息延時佇列】的舊資料(在傳送了延時訊息後,還沒到期時所有使用者端都下線了,這樣就沒人能把【訊息延時佇列】裡的到期資料移動到【訊息目標佇列】裡,就會出現這種情況);

最主要的還是設定了釋出訂閱監聽器,當有人傳送延時訊息的時候能收到通知,定時去將【訊息延時佇列】裡的到期資料移動到【訊息目標佇列】。

2.傳送延時訊息會先傳送到【訊息延時佇列】和【訊息順序佇列】,如果【訊息延時佇列】裡沒有資料,則將剛傳送的到期時間釋出到指定主題,提醒其他使用者端有新訊息。

3.初始化延時佇列時設定的釋出訂閱監聽器把【訊息延時佇列】裡的到期資料移動到【訊息目標佇列】裡。

4.獲取延遲訊息只需要執行 blpop 阻塞的獲取【訊息目標佇列】的訊息就可以了。

這裡回答開頭部分說的問題,到這看完了本文,你可以試著自己想一想這個問題的答案。

接收訊息程式碼如下,可以看到 delayedQueue 是沒有用到的,那麼為什麼要加這一行呢,這個後面總結部分回答。

public void consume() throws InterruptedException {
    String queuename = "delay-queue";
    RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);
    RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
    String msg = blockingQueue.take();
    //收到訊息進行處理...
}

其實這個問題也是我開發過程中遇到的一個奇怪的地方,接收方程式碼沒有初始化延時佇列。

首先再囉嗦一句,初始化延時佇列的作用是會定時去把【訊息延時佇列】裡的到期資料移動到【訊息目標佇列】。

如果只有傳送方初始化延時佇列:

  • 傳送方傳送了延遲訊息,在到期之前下線了(它就不能把【訊息延時佇列】裡的到期資料移動到【訊息目標佇列】),而且沒有其他傳送方。
  • 接收方不管有多少個,都沒人能把【訊息延時佇列】裡的到期資料移動到【訊息目標佇列】。

所以接收方程式碼裡也初始化延時佇列能夠避免一部分資料丟失問題。

到此這篇關於Redisson 分散式延時佇列 RedissonDelayedQueue 執行流程的文章就介紹到這了,更多相關 Redisson RedissonDelayedQueue 內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com