首頁 > 軟體

RxJava2 Scheduler使用範例深入解析

2022-10-26 14:02:55

前言

歡迎來到大家深入理解 RxJava2 系列第二篇,這裡先插上一句,本系列文章用的原始碼都是基於 RxJava 2.2.0 正式版。本篇文章將先與大家一起理解 Scheduler 與 Worker ,順著 RxJava2 的原始碼捋一下它們的實現原理。

Scheduler 與 Worker

Scheduler 與 Worker 在 RxJava2 中是一個非常重要的概念,他們是 RxJava 執行緒排程的核心與基石。用過的人肯定都會了解一些,但是想必瞭解 Worker 的讀者們就不多了。很多人會疑惑,既然有了 Scheduler 可以直接排程 Runnable,為何又強加一個 Worker 的概念,諸位稍安勿躁,跟著筆者的思路一起走下去。

定義

筆者這裡展示一下 Scheduler 最核心的定義部分:

public abstract class Scheduler {
    @NonNull
    public abstract Worker createWorker();
    public Disposable scheduleDirect(@NonNull Runnable run) {
        ...
    }
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        ...
    }
    @NonNull
    public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
        ...
    }
    public abstract static class Worker implements Disposable {
        @NonNull
        public Disposable schedule(@NonNull Runnable run) {
            ...
        }
        @NonNull
        public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
        @NonNull
        public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
            ...
        }
    }
}

從上面的定義可以看出,Scheduler 本質上就是用來排程 Runnable 的,支援立即、延時和週期形式的呼叫,而 Worker 是任務的最小單元的載體。在 RxJava2 內部的實現中,通常一個或者多個 Worker 對應一個ScheduledThreadPoolExecutor物件,這些暫且不表。

scheduleDirect / schedulePeriodicallyDirect

在 RxJava 1.x 時代, Scheduler 是沒有scheduleDirect/schedulePeriodicallyDirect的,只能夠先createWorker,再通過 Worker 來排程任務。這些方法是對 Worker 呼叫的簡化,可以認為是建立了一個只能排程一次任務的 Worker 並立馬排程了該任務。在Scheduler基礎類別的原始碼中,也可以看出預設的實現是直接 createWorker 並建立對應的 Task 的(雖然在部分 Scheduler 覆蓋的實現上並沒有建立 Worker,但是可以認為存在虛擬的 Worker)。

createWorker

一個 Scheduler 可以建立多個 Worker,這兩者是一對多的關係,而 Worker 與 Task 也是一對多的關係。

如下圖所示:

Worke 的存在為了確保兩件事:

  • 同一個 Worker 建立的 Task 都會確保序列,且立即執行的任務符合先進先出原則。
  • Worker 繫結了呼叫了他的方法的 Runnable,當該 Worker 取消時,基於他的 Task 均被取消

因此當有操作符需要使用 Scheduler 時,可以通過 Worker 來將一系列的 Runnable 統一的排程和取消,最典型的例子就是observeOn,下面會詳細分析。

Schedulers

RxJava2 預設內建了幾種 Scheduler 的實現,適用於不同的場景,這些 Scheduler 均在 Schedulers 類中可以直接獲得

方法說明
Schedulers.computation()適用於計算密集型任務
Schedulers.io()適用於 IO 密集型任務
Schedulers.trampoline()在某個呼叫 schedule 的執行緒執行
Schedulers.newThread()每個 Worker 對應一個新執行緒
Schedulers.single()所有 Worker 使用同一個執行緒執行任務
Schedulers.from(Executor)使用 Executor 作為任務執行的執行緒

這裡我們挑選兩個最常用的 computation / io 原始碼稍作分析。

NewThreadWorker

NewThreadWorker 在 computation / io / newThread 均有涉及,我們先了解一下這個類。

上面筆者有提到過 Worker 與ScheduledThreadPoolExecutor 的關係,而這裡的NewThreadWorkerScheduledThreadPoolExecutor便是一對一的關係。在NewThreadWorker建構函式中會通過工廠方法建立一個corePoolSize 為 1 的ScheduledThreadPoolExecutor物件並持有之。

ScheduledThreadPoolExecutor 從 JDK1.5 開始存在,這個類繼承於 ThreadPoolExecutor,可以支援即使、延時和週期的任務。但是注意在ScheduledThreadPoolExecutor中 maximumPoolSize 引數是無效的,corePoolSize 表示其最大執行緒數,且它的佇列是無界的。這裡不再細說該類,否則涉及的就太多了。

有了這個類,RxJava2 實現 Worker 時便是站在了巨人的肩膀上,執行緒排程可以直接使用該類解決,略微麻煩之處就是封一層Disposable的邏輯。

具體細節讀者可以從原始碼一探究竟。

ComputationScheduler

作為計算密集型的 Scheduler,ComputationScheduler的執行緒數是與 CPU 核心密切相關的,原因是當執行緒數遠遠超過 CPU 核心數目時,CPU 的時間更多的損耗在了執行緒的上下文切換,因此比較通用的方式是保持最大執行緒數和 CPU 核心數一致。

最大執行緒數目

MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
static int cap(int cpuCount, int paramThreads) {
    return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads;
}

從上面程式碼可見MAX_THREADS 大於 0,但是不超過 CPU 核心數,實際數值也受使用者設定的 System Properties 的影響。

FixedSchedulerPool

顧名思義,FixedSchedulerPool 可以認為是固定數目的真正的 Worker 的快取池。

確定了MAX_THREADS後,在ComputationScheduler的建構函式,會建立FixedSchedulerPool物件,FixedSchedulerPool 內部會直接建立一個長度為MAX_THREADSPoolWorker陣列。PoolWorker繼承自NewThreadWorker,但是沒有任何額外的程式碼。

static final class PoolWorker extends NewThreadWorker {
    PoolWorker(ThreadFactory threadFactory) {
        super(threadFactory);
    }
}

也就是說當FixedSchedulerPool建立時,已經有MAX_THREADS個 corePoolSize 為 1 的 ScheduledThreadPoolExecutor隨之建立。

PoolWorker

從使用角度來說,有了FixedSchedulerPool 好像就夠了,我們只需要每次createWorker時從池子裡取一個PoolWorker並返回即可。

但是這裡忽略了一個要點,每個 Worker 是獨立的,每個 Worker 內部的任務是繫結在這個 Worker 中的。如果按照上述的做法,暴露出去PoolWorker,會出現 2 個問題:

  • createWorker 會可能會返回相同的 Worker,導致這個 Worker 被 dispose 後,其內部所有的任務會被一併取消,而違背了不同 Worker 之間的任務的獨立性
  • PoolWorker也就是NewThreadWorker 被 dispose 後,其關聯的ScheduledThreadPoolExecutor被 shutdown,後續再次獲取該 Worker 也會導致無法建立任務

EventLoopWorker

為了解決上述的問題,我們需要在PoolWorker外再包一層,createWorker每次都會建立一個EventLoopWorker物件。

EventLoopWorker 其實是個代理物件,他會將 Runnable 代理給FixedSchedulerPool中取到的PoolWorker來排程,並且他會負責管理經由他建立的任務,當自身被取消時,會將建立的任務統統取消。

示意圖

IoScheduler

與 ComputationScheduler 恰恰相反,IO 密集型的 Scheduler 執行緒數是無上限的。這是因為 IO 裝置的速度是遠遠低於 CPU 速度的,在等待 IO 操作時, CPU 往往是閒置的,因此應該建立更多的執行緒讓 CPU 儘可能的利用。當然並不是說執行緒越多越好,執行緒數目膨脹到一定程度既會影響 CPU 的效率,也會消耗大量的記憶體。在IoScheduler中,每個 Worker 在空置一段時間後就會被清除以控制執行緒的數目。

CachedWorkerPool

CachedWorkerPool是一個變長並定期清理的ThreadWorker的快取池,內部通過一個ConcurrentLinkedQueue維護。和PoolWorker類似,ThreadWorker也是繼承自NewThreadWorker

static final class ThreadWorker extends NewThreadWorker {
    private long expirationTime;
    ThreadWorker(ThreadFactory threadFactory) {
        super(threadFactory);
        this.expirationTime = 0L;
    }
    public long getExpirationTime() {
        return expirationTime;
    }
    public void setExpirationTime(long expirationTime) {
        this.expirationTime = expirationTime;
    }
}

僅僅是增加了一個expirationTime欄位,用來標識這個ThreadWorker的超時時間。

於此同時,在CachedWorkerPool初始化時會傳入 Worker 的超時時間,目前是寫死的 60 秒。這個超時時間表示ThreadWorker閒置後最大存活時間(實際中不保證 60 秒時被回收)。

EventLoopWorker

IoScheduler中也存在一個EventLoopWorker類,它和ComputationScheduler中的作用也是類似的:

  • 管理自身排程過的任務
  • 管理ThreadWorker,使其可被回收再次使用

Worker 的管理

  • 建立:在閒置佇列中查詢ThreadWorker,如果存在則取出,否則new``一個新的ThreadWorker,最後在外面包一層EventLoopWorker```並返回。
  • 回收:當EventLoopWorker dispose 後,會更新內部的ThreadWorker超時時間,並促使CachedWorkerPoolThreadWorker加入閒置佇列
  • 清理:CachedWorkerPool在初始化時啟動定時任務,每隔 60 秒清理佇列中超時的ThreadWorker

這裡說個細節,因為CachedWorkerPool是每隔 60 秒清理一次佇列的,因此ThreadWorker的存活時間取決於入隊的時機,如果一直沒有被再次取出,其被實際清理的延遲在 60 - 120 秒之間,有興趣的讀者可以想一想為什麼。

示意圖

對比

熟悉執行緒的讀者朋友們會發現,ComputationSchedulerIoScheduler很像某些引數下的ThreadPoolExecutor

ThreadPoolExecutor 引數ComputationScheduler(n)IoScheduler
corePoolSizen0
maximumPoolSizenInteger.MAX_VALUE
keepAliveTime060
unit-TimeUnit.SECONDS
workQueueLinkedBlockingQueueSynchronousQueue

他們對執行緒的控制外在的表現很相似。 但是實際的執行緒執行物件不一樣:

  • ThreadPoolExecutor:Thread
  • Scheduler:支援立即、延遲、定時排程任務的物件,通常為 ScheduledThreadPoolExecutor(coreSize = 1)

這兩者的對比有助於我們更加深刻地理解 Scheduler 設計的內在邏輯。

結語

Scheduler 是 RxJava 執行緒的核心概念,RxJava 基於此遮蔽了 Thread 相關的概念,只與 Scheduler / Worker / Runnable 打交道。

以上就是RxJava2 Scheduler使用範例深入解析的詳細內容,更多關於RxJava2 Scheduler使用的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com