<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
歡迎來到大家深入理解 RxJava2 系列第二篇,這裡先插上一句,本系列文章用的原始碼都是基於 RxJava 2.2.0 正式版。本篇文章將先與大家一起理解 Scheduler 與 Worker ,順著 RxJava2 的原始碼捋一下它們的實現原理。
Scheduler 與 Worker 在 RxJava2 中是一個非常重要的概念,他們是 RxJava 執行緒排程的核心與基石。用過的人肯定都會了解一些,但是想必瞭解 Worker 的讀者們就不多了。很多人會疑惑,既然有了 Scheduler 可以直接排程 Runnable,為何又強加一個 Worker 的概念,諸位稍安勿躁,跟著筆者的思路一起走下去。
筆者這裡展示一下 Scheduler 最核心的定義部分:
public abstract class Scheduler { @NonNull public abstract Worker createWorker(); public Disposable scheduleDirect(@NonNull Runnable run) { ... } public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { ... } @NonNull public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) { ... } public abstract static class Worker implements Disposable { @NonNull public Disposable schedule(@NonNull Runnable run) { ... } @NonNull public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit); @NonNull public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) { ... } } }
從上面的定義可以看出,Scheduler 本質上就是用來排程 Runnable 的,支援立即、延時和週期形式的呼叫,而 Worker 是任務的最小單元的載體。在 RxJava2 內部的實現中,通常一個或者多個 Worker 對應一個ScheduledThreadPoolExecutor
物件,這些暫且不表。
在 RxJava 1.x 時代, Scheduler 是沒有scheduleDirect/schedulePeriodicallyDirect
的,只能夠先createWorker
,再通過 Worker 來排程任務。這些方法是對 Worker 呼叫的簡化,可以認為是建立了一個只能排程一次任務的 Worker 並立馬排程了該任務。在Scheduler
基礎類別的原始碼中,也可以看出預設的實現是直接 createWorker 並建立對應的 Task 的(雖然在部分 Scheduler 覆蓋的實現上並沒有建立 Worker,但是可以認為存在虛擬的 Worker)。
一個 Scheduler 可以建立多個 Worker,這兩者是一對多的關係,而 Worker 與 Task 也是一對多的關係。
如下圖所示:
Worke 的存在為了確保兩件事:
因此當有操作符需要使用 Scheduler 時,可以通過 Worker 來將一系列的 Runnable 統一的排程和取消,最典型的例子就是observeOn
,下面會詳細分析。
RxJava2 預設內建了幾種 Scheduler 的實現,適用於不同的場景,這些 Scheduler 均在 Schedulers 類中可以直接獲得
方法 | 說明 |
---|---|
Schedulers.computation() | 適用於計算密集型任務 |
Schedulers.io() | 適用於 IO 密集型任務 |
Schedulers.trampoline() | 在某個呼叫 schedule 的執行緒執行 |
Schedulers.newThread() | 每個 Worker 對應一個新執行緒 |
Schedulers.single() | 所有 Worker 使用同一個執行緒執行任務 |
Schedulers.from(Executor) | 使用 Executor 作為任務執行的執行緒 |
這裡我們挑選兩個最常用的 computation / io 原始碼稍作分析。
NewThreadWorker 在 computation / io / newThread 均有涉及,我們先了解一下這個類。
上面筆者有提到過 Worker 與ScheduledThreadPoolExecutor
的關係,而這裡的NewThreadWorker
與ScheduledThreadPoolExecutor
便是一對一的關係。在NewThreadWorker
建構函式中會通過工廠方法建立一個corePoolSize 為 1 的ScheduledThreadPoolExecutor
物件並持有之。
ScheduledThreadPoolExecutor
從 JDK1.5 開始存在,這個類繼承於 ThreadPoolExecutor
,可以支援即使、延時和週期的任務。但是注意在ScheduledThreadPoolExecutor
中 maximumPoolSize 引數是無效的,corePoolSize 表示其最大執行緒數,且它的佇列是無界的。這裡不再細說該類,否則涉及的就太多了。
有了這個類,RxJava2 實現 Worker 時便是站在了巨人的肩膀上,執行緒排程可以直接使用該類解決,略微麻煩之處就是封一層Disposable
的邏輯。
具體細節讀者可以從原始碼一探究竟。
作為計算密集型的 Scheduler,ComputationScheduler
的執行緒數是與 CPU 核心密切相關的,原因是當執行緒數遠遠超過 CPU 核心數目時,CPU 的時間更多的損耗在了執行緒的上下文切換,因此比較通用的方式是保持最大執行緒數和 CPU 核心數一致。
MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0)); static int cap(int cpuCount, int paramThreads) { return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads; }
從上面程式碼可見MAX_THREADS
大於 0,但是不超過 CPU 核心數,實際數值也受使用者設定的 System Properties 的影響。
顧名思義,FixedSchedulerPool
可以認為是固定數目的真正的 Worker 的快取池。
確定了MAX_THREADS
後,在ComputationScheduler
的建構函式,會建立FixedSchedulerPool
物件,FixedSchedulerPool
內部會直接建立一個長度為MAX_THREADS
的PoolWorker
陣列。PoolWorker
繼承自NewThreadWorker
,但是沒有任何額外的程式碼。
static final class PoolWorker extends NewThreadWorker { PoolWorker(ThreadFactory threadFactory) { super(threadFactory); } }
也就是說當FixedSchedulerPool
建立時,已經有MAX_THREADS
個 corePoolSize 為 1 的 ScheduledThreadPoolExecutor
隨之建立。
從使用角度來說,有了FixedSchedulerPool
好像就夠了,我們只需要每次createWorker
時從池子裡取一個PoolWorker
並返回即可。
但是這裡忽略了一個要點,每個 Worker 是獨立的,每個 Worker 內部的任務是繫結在這個 Worker 中的。如果按照上述的做法,暴露出去PoolWorker
,會出現 2 個問題:
PoolWorker
也就是NewThreadWorker
被 dispose 後,其關聯的ScheduledThreadPoolExecutor
被 shutdown,後續再次獲取該 Worker 也會導致無法建立任務為了解決上述的問題,我們需要在PoolWorker
外再包一層,createWorker
每次都會建立一個EventLoopWorker
物件。
EventLoopWorker
其實是個代理物件,他會將 Runnable 代理給FixedSchedulerPool
中取到的PoolWorker
來排程,並且他會負責管理經由他建立的任務,當自身被取消時,會將建立的任務統統取消。
與 ComputationScheduler 恰恰相反,IO 密集型的 Scheduler 執行緒數是無上限的。這是因為 IO 裝置的速度是遠遠低於 CPU 速度的,在等待 IO 操作時, CPU 往往是閒置的,因此應該建立更多的執行緒讓 CPU 儘可能的利用。當然並不是說執行緒越多越好,執行緒數目膨脹到一定程度既會影響 CPU 的效率,也會消耗大量的記憶體。在IoScheduler
中,每個 Worker 在空置一段時間後就會被清除以控制執行緒的數目。
CachedWorkerPool
是一個變長並定期清理的ThreadWorker
的快取池,內部通過一個ConcurrentLinkedQueue
維護。和PoolWorker
類似,ThreadWorker
也是繼承自NewThreadWorker
:
static final class ThreadWorker extends NewThreadWorker { private long expirationTime; ThreadWorker(ThreadFactory threadFactory) { super(threadFactory); this.expirationTime = 0L; } public long getExpirationTime() { return expirationTime; } public void setExpirationTime(long expirationTime) { this.expirationTime = expirationTime; } }
僅僅是增加了一個expirationTime
欄位,用來標識這個ThreadWorker
的超時時間。
於此同時,在CachedWorkerPool
初始化時會傳入 Worker 的超時時間,目前是寫死的 60 秒。這個超時時間表示ThreadWorker
閒置後最大存活時間(實際中不保證 60 秒時被回收)。
IoScheduler
中也存在一個EventLoopWorker
類,它和ComputationScheduler
中的作用也是類似的:
ThreadWorker
,使其可被回收再次使用ThreadWorker
,如果存在則取出,否則new``一個新的
ThreadWorker,最後在外面包一層EventLoopWorker```並返回。EventLoopWorker
dispose 後,會更新內部的ThreadWorker
超時時間,並促使CachedWorkerPool
將ThreadWorker
加入閒置佇列CachedWorkerPool
在初始化時啟動定時任務,每隔 60 秒清理佇列中超時的ThreadWorker
這裡說個細節,因為CachedWorkerPool
是每隔 60 秒清理一次佇列的,因此ThreadWorker
的存活時間取決於入隊的時機,如果一直沒有被再次取出,其被實際清理的延遲在 60 - 120 秒之間,有興趣的讀者可以想一想為什麼。
熟悉執行緒的讀者朋友們會發現,ComputationScheduler
與IoScheduler
很像某些引數下的ThreadPoolExecutor
。
ThreadPoolExecutor 引數 | ComputationScheduler(n) | IoScheduler |
---|---|---|
corePoolSize | n | 0 |
maximumPoolSize | n | Integer.MAX_VALUE |
keepAliveTime | 0 | 60 |
unit | - | TimeUnit.SECONDS |
workQueue | LinkedBlockingQueue | SynchronousQueue |
他們對執行緒的控制外在的表現很相似。 但是實際的執行緒執行物件不一樣:
這兩者的對比有助於我們更加深刻地理解 Scheduler 設計的內在邏輯。
Scheduler 是 RxJava 執行緒的核心概念,RxJava 基於此遮蔽了 Thread 相關的概念,只與 Scheduler / Worker / Runnable 打交道。
以上就是RxJava2 Scheduler使用範例深入解析的詳細內容,更多關於RxJava2 Scheduler使用的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45