首頁 > 軟體

Android Dispatchers.IO執行緒池深入刨析

2022-08-25 22:00:54

一. Dispatchers.IO

1.Dispatchers.IO

在協程中,當需要執行IO任務時,會在上下文中指定Dispatchers.IO來進行執行緒的切換排程。 而IO實際上是CoroutineDispatcher型別的物件,實際的值為DefaultScheduler類的常數物件IO,程式碼如下:

public actual object Dispatchers {
    ...
    @JvmStatic
    public val IO: CoroutineDispatcher = DefaultScheduler.IO
}

2.DefaultScheduler類

DefaultScheduler類繼承自ExperimentalCoroutineDispatcher類,內部提供了型別為LimitingDispatcher的IO物件,程式碼如下:

// 系統設定變數
public const val IO_PARALLELISM_PROPERTY_NAME: String = "kotlinx.coroutines.io.parallelism"
...
// 表示不會阻塞的任務,純CPU任務
internal const val TASK_NON_BLOCKING = 0
// 表示執行過程中可能會阻塞的任務,非純CPU任務
internal const val TASK_PROBABLY_BLOCKING = 1
...
// 預設執行緒池名稱
internal const val DEFAULT_DISPATCHER_NAME = "Dispatchers.Default"
...
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
    // 建立名為Dispatchers.IO的執行緒池
    // 最大並行數量為kotlinx.coroutines.io.parallelism指定的值,預設為64與CPU數量中的較大者
    // 預設的執行的任務型別為TASK_PROBABLY_BLOCKING
    val IO: CoroutineDispatcher = LimitingDispatcher(
        this,
        systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
        "Dispatchers.IO",
        TASK_PROBABLY_BLOCKING
    )
    override fun close() {
        throw UnsupportedOperationException("$DEFAULT_DISPATCHER_NAME cannot be closed")
    }
    // 可以看出IO和Default共用一個執行緒池
    override fun toString(): String = DEFAULT_DISPATCHER_NAME
    @InternalCoroutinesApi
    @Suppress("UNUSED")
    public fun toDebugString(): String = super.toString()
}

3.LimitingDispatcher類

LimitingDispatcher類繼承自ExecutorCoroutineDispatcher類,實現了TaskContext介面和Executor介面。

LimitingDispatcher類的核心是構造方法中型別為ExperimentalCoroutineDispatcher的dispatcher物件。

LimitingDispatcher類看起來是一個標準的執行緒池,但實際上LimitingDispatcher類只對類引數中傳入的dispatcher進行包裝和功能擴充套件。如同名字中的litmit一樣,LimitingDispatcher類主要用於對任務執行數量進行限制,程式碼如下:

// dispatcher引數傳入了DefaultScheduler物件
// parallelism表示並行執行的任務數量
// name表示執行緒池的名字
// taskMode表示任務模式,TaskContext介面中的常數
private class LimitingDispatcher(
    private val dispatcher: ExperimentalCoroutineDispatcher,
    private val parallelism: Int,
    private val name: String?,
    override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
    // 用於儲存任務的佇列
    private val queue = ConcurrentLinkedQueue<Runnable>()
    // 用於記錄當前正在執行的任務的數量
    private val inFlightTasks = atomic(0)
    // 獲取當前執行緒池
    override val executor: Executor
        get() = this
    // Executor介面的實現,執行緒池的核心方法,通過dispatch實現
    override fun execute(command: Runnable) = dispatch(command, false)
    override fun close(): Unit = error("Close cannot be invoked on LimitingBlockingDispatcher")
    // CoroutineDispatcher介面的實現
    override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
    // 任務分發的核心方法
    private fun dispatch(block: Runnable, tailDispatch: Boolean) {
        // 獲取當前要執行的任務
        var taskToSchedule = block
        // 死迴圈
        while (true) {
            // 當前執行的任務數加一,也可理解生成生成當前要執行的任務的編號
            val inFlight = inFlightTasks.incrementAndGet()
            // 如果當前需要執行的任務數小於允許的並行執行任務數量,說明可以執行,
            if (inFlight <= parallelism) {
                // 呼叫引數中的dispatcher物件,執行任務
                dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
                // 返回,退出迴圈
                return
            }
            // 如果達到的最大並行數的限制,則將任務加入到佇列中
            queue.add(taskToSchedule)
            // 下面的程式碼防止執行緒競爭導致任務卡在佇列裡不被執行,case如下:
            // 執行緒1:inFlightTasks = 1 ,執行任務
            // 執行緒2:inFlightTasks = 2,當前達到了parallelism限制,
            // 執行緒1:執行結束,inFlightTasks = 1
            // 執行緒2:將任務新增到佇列裡,執行結束,inFlightTasks = 0
            // 由於未執行,因此這裡當前執行的任務數先減一
            // 減一後如果仍然大於等於在大並行數,則直接返回,退出迴圈
            if (inFlightTasks.decrementAndGet() >= parallelism) {
                return
            }
            // 如果減一後,發現可以執行任務,則從隊首獲取任務,進行下一次迴圈
            // 如果佇列為空,說明沒有任務,則返回,退出迴圈
            taskToSchedule = queue.poll() ?: return
        }
    }
    // CoroutineDispatcher介面的實現,用於yield掛起協程時的排程處理
    override fun dispatchYield(context: CoroutineContext, block: Runnable) {
        // 也是通過dispatch方法實現,注意這裡tailDispatch引數為true
        dispatch(block, tailDispatch = true)
    }
    override fun toString(): String {
        return name ?: "${super.toString()}[dispatcher = $dispatcher]"
    }
    // TaskContext介面的實現,用於在一個任務執行完進行回撥
    override fun afterTask() {
        // 從隊首獲取一個任務
        var next = queue.poll()
        // 若可以獲取到
        if (next != null) {
            // 則執行任務,注意這裡tailDispatch引數為true
            dispatcher.dispatchWithContext(next, this, true)
            // 返回
            return
        }
        // 任務執行完畢,當前執行的任務數量減一
        inFlightTasks.decrementAndGet()
        // 下面的程式碼防止執行緒競爭導致任務卡在佇列裡不被執行,case如下:
        // 執行緒1:inFlightTasks = 1 ,執行任務
        // 執行緒2:inFlightTasks = 2
        // 執行緒1:執行結束,執行afterTask方法,發現佇列為空,此時inFlightTasks = 2
        // 執行緒2:inFlightTasks當前達到了parallelism限制,
        //      將任務加入到佇列中,執行結束,inFlightTasks = 1
        // 執行緒1:inFlightTasks=1,執行結束
        // 從佇列中取出任務,佇列為空則返回
        next = queue.poll() ?: return
        // 執行任務,注意這裡tailDispatch引數為true
        dispatch(next, true)
    }
}

dispatcher的dispatch方法定義在ExperimentalCoroutineDispatcher類中。

4.ExperimentalCoroutineDispatcher類

ExperimentalCoroutineDispatcher類繼承自ExecutorCoroutineDispatcher類,程式碼如下:

// corePoolSize執行緒池核心執行緒數
// maxPoolSize表示執行緒池最大執行緒數
// schedulerName表示內部協程排程器的名字
// idleWorkerKeepAliveNs表示空閒的執行緒存活時間
@InternalCoroutinesApi
public open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int,
    private val maxPoolSize: Int,
    private val idleWorkerKeepAliveNs: Long,
    private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
    // 我們在DefaultScheduler類中就是通過預設的構造方法,
    // 建立的父類別ExperimentalCoroutineDispatcher物件
    public constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE,
        schedulerName: String = DEFAULT_SCHEDULER_NAME
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
    ...
    // 建立coroutineScheduler物件
    private var coroutineScheduler = createScheduler()
    // 核心的分發方法
    override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
        try {
            // 呼叫coroutineScheduler物件的dispatch方法
            coroutineScheduler.dispatch(block)
        } catch (e: RejectedExecutionException) {
            // 只有當coroutineScheduler正在關閉時,才會拒絕執行,丟擲異常
            DefaultExecutor.dispatch(context, block)
        }
    ...
    private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
    ...
}
// 核心執行緒數
@JvmField
internal val CORE_POOL_SIZE = systemProp(
    "kotlinx.coroutines.scheduler.core.pool.size",
    AVAILABLE_PROCESSORS.coerceAtLeast(2), // !!! at least two here
    minValue = CoroutineScheduler.MIN_SUPPORTED_POOL_SIZE
)
// 最大執行緒數
@JvmField
internal val MAX_POOL_SIZE = systemProp(
    "kotlinx.coroutines.scheduler.max.pool.size",
    (AVAILABLE_PROCESSORS * 128).coerceIn(
        CORE_POOL_SIZE,
        CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE
    ),
    maxValue = CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE
)
// 空閒執行緒的存活時間
@JvmField
internal val IDLE_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos(
    systemProp("kotlinx.coroutines.scheduler.keep.alive.sec", 60L)
)

在ExperimentalCoroutineDispatcher類的dispatch方法內部,通過呼叫型別為CoroutineScheduler的物件的dispatch方法實現。

二.CoroutineScheduler類

1.CoroutineScheduler類的繼承關係

在對CoroutineScheduler類的dispatch方法分析之前,首先分析一下CoroutineScheduler類的繼承關係,程式碼如下:

// 實現了Executor和Closeable介面
// corePoolSize執行緒池核心執行緒數
// maxPoolSize表示執行緒池最大執行緒數
// schedulerName表示內部協程排程器的名字
// idleWorkerKeepAliveNs表示空閒的執行緒存活時間
internal class CoroutineScheduler(
    @JvmField val corePoolSize: Int,
    @JvmField val maxPoolSize: Int,
    @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
    init {
        // 核心執行緒數量必須大於等於MIN_SUPPORTED_POOL_SIZE
        require(corePoolSize >= MIN_SUPPORTED_POOL_SIZE) {
            "Core pool size $corePoolSize should be at least $MIN_SUPPORTED_POOL_SIZE"
        }
        // 最大執行緒數量必須大於等於核心執行緒數量
        require(maxPoolSize >= corePoolSize) {
            "Max pool size $maxPoolSize should be greater than or equals to core pool size $corePoolSize"
        }
        // 最大執行緒數量必須小於等於MAX_SUPPORTED_POOL_SIZE
        require(maxPoolSize <= MAX_SUPPORTED_POOL_SIZE) {
            "Max pool size $maxPoolSize should not exceed maximal supported number of threads $MAX_SUPPORTED_POOL_SIZE"
        }
        // 空閒的執行緒存活時間必須大於0
        require(idleWorkerKeepAliveNs > 0) {
            "Idle worker keep alive time $idleWorkerKeepAliveNs must be positive"
        }
    }
    ...
    // Executor介面中的實現,通過dispatch方法實現
    override fun execute(command: Runnable) = dispatch(command)
    // Closeable介面中的實現,通過shutdown方法實現
    override fun close() = shutdown(10_000L)
    ...
}

2.CoroutineScheduler類的全域性變數

接下來對CoroutineScheduler類中重要的全域性變數進行分析,程式碼如下:

// 用於儲存全域性的純CPU(不阻塞)任務
@JvmField
val globalCpuQueue = GlobalQueue()
// 用於儲存全域性的執行非純CPU(可能阻塞)任務
@JvmField
val globalBlockingQueue = GlobalQueue()
...
// 用於記錄當前處於Parked狀態(一段時間後自動終止)的執行緒的數量
private val parkedWorkersStack = atomic(0L)
...
// 用於儲存當前執行緒池中的執行緒
// workers[0]永遠為null,作為哨兵位
// index從1到maxPoolSize為有效執行緒
@JvmField
val workers = AtomicReferenceArray<Worker?>(maxPoolSize + 1)
...
// 控制狀態
private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT)
// 表示已經建立的執行緒的數量
private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt()
// 表示可以獲取的CPU令牌數量,初始值為執行緒池核心執行緒數量
private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value)
// 獲取指定的狀態的已經建立的執行緒的數量
private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt()
// 獲取指定的狀態的執行阻塞任務的數量
private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt()
// 獲取指定的狀態的CPU令牌數量
public inline fun availableCpuPermits(state: Long): Int = (state and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt()
// 當前已經建立的執行緒數量加1
private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.incrementAndGet())
// 當前已經建立的執行緒數量減1
private inline fun decrementCreatedWorkers(): Int = createdWorkers(controlState.getAndDecrement())
// 當前執行阻塞任務的執行緒數量加1
private inline fun incrementBlockingTasks() = controlState.addAndGet(1L shl BLOCKING_SHIFT)
// 當前執行阻塞任務的執行緒數量減1
private inline fun decrementBlockingTasks() {
    controlState.addAndGet(-(1L shl BLOCKING_SHIFT))
}
// 嘗試獲取CPU令牌
private inline fun tryAcquireCpuPermit(): Boolean = controlState.loop { state ->
    val available = availableCpuPermits(state)
    if (available == 0) return false
    val update = state - (1L shl CPU_PERMITS_SHIFT)
    if (controlState.compareAndSet(state, update)) return true
}
// 釋放CPU令牌
private inline fun releaseCpuPermit() = controlState.addAndGet(1L shl CPU_PERMITS_SHIFT)
// 表示當前執行緒池是否關閉
private val _isTerminated = atomic(false)
val isTerminated: Boolean get() = _isTerminated.value
companion object {
    // 用於標記一個執行緒是否在parkedWorkersStack中(處於Parked狀態)
    @JvmField
    val NOT_IN_STACK = Symbol("NOT_IN_STACK")
    // 執行緒的三個狀態
    // CLAIMED表示執行緒可以執行任務
    // PARKED表示執行緒暫停執行任務,一段時間後會自動進入終止狀態
    // TERMINATED表示執行緒處於終止狀態
    private const val PARKED = -1
    private const val CLAIMED = 0
    private const val TERMINATED = 1
    // 以下五個常數為掩碼
    private const val BLOCKING_SHIFT = 21 // 2x1024x1024
    // 1-21位
    private const val CREATED_MASK: Long = (1L shl BLOCKING_SHIFT) - 1
    // 22-42位
    private const val BLOCKING_MASK: Long = CREATED_MASK shl BLOCKING_SHIFT
    // 42
    private const val CPU_PERMITS_SHIFT = BLOCKING_SHIFT * 2
    // 43-63位
    private const val CPU_PERMITS_MASK = CREATED_MASK shl CPU_PERMITS_SHIFT
    // 以下兩個常數用於require中引數判斷
    internal const val MIN_SUPPORTED_POOL_SIZE = 1
    // 2x1024x1024-2
    internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2
    // parkedWorkersStack的掩碼
    private const val PARKED_INDEX_MASK = CREATED_MASK
    // inv表示01反轉
    private const val PARKED_VERSION_MASK = CREATED_MASK.inv()
    private const val PARKED_VERSION_INC = 1L shl BLOCKING_SHIFT
}

CoroutineScheduler類中對執行緒的狀態與許可權控制:

availableCpuPermits的初始值為引數中核心執行緒數corePoolSize的值,表示CoroutineScheduler類中最多隻有corePoolSize個核心執行緒。執行純CPU任務的執行緒每次執行任務之前需要在availableCpuPermits中進行記錄與申請。blockingTasks表示執行非純CPU任務的數量。這部分執行緒在執行時不需要CPU令牌。createdWorkers表示當前執行緒池中所有執行緒的數量,每個執行緒在建立或終止時都需要通過在這裡進行記錄。這些變數的具體關係如下:

createdWorkers = blockingTasks + corePoolSize - availableCpuPermits

CPU令牌是執行緒池自定義的概念,不代表時間片,只是為了保證核心執行緒的數量。

三.Worker類與WorkerState類

在分析CoroutineScheduler類的dispatch方法之前,還需要分析一下CoroutineScheduler類中的兩個重要的內部類Worker類以及其對應的狀態類WorkerState類。

Worker是一個執行緒池中任務的核心執行者,幾乎在所有的執行緒池中都存在Worker的概念。

1.WorkerState類

首先分析一下WorkerState類,程式碼如下:

// 一個列舉類,表示Worker的狀態
enum class WorkerState {
    // 擁有了CPU令牌,可以執行純CPU任務,也可以執行非純CPU任務
    CPU_ACQUIRED,
    // 可以執行非純CPU任務
    BLOCKING,
    // 當前已經暫停,一段時間後將終止,也有可能被再次使用
    PARKING,
    // 休眠狀態,用於初始狀態,只能執行自己本地任務
    DORMANT,
    // 終止狀態,將不再被使用
    TERMINATED
}

2.Worker類的繼承關係與全域性變數

接下來對Worker類的繼承關係以及其中重要的全域性變數進行分析,程式碼如下:

// 繼承自Thread類
// 私有化無參的構造方法
internal inner class Worker private constructor() : Thread() {
    init {
        // 標記為守護執行緒
        isDaemon = true
    }
    // 當前執行緒在儲存執行緒池執行緒的陣列workers中的索引位置
    @Volatile
    var indexInArray = 0
        set(index) {
            // 設定執行緒名
            name = "$schedulerName-worker-${if (index == 0) "TERMINATED" else index.toString()}"
            field = index
        }
    // 構造方法
    constructor(index: Int) : this() {
        indexInArray = index
    }
    // 獲取當前執行緒的排程器
    inline val scheduler get() = this@CoroutineScheduler
    // 執行緒儲存任務的本地佇列
    @JvmField
    val localQueue: WorkQueue = WorkQueue()
    // 執行緒的狀態 (內部轉換)
    @JvmField
    var state = WorkerState.DORMANT
    // 執行緒的控制狀態(外部賦予)
    val workerCtl = atomic(CLAIMED)
    // 終止截止時間,表示處於PARKING狀態的執行緒,在terminationDeadline毫秒後終止
    private var terminationDeadline = 0L
    // 表示當執行緒處於PARKING狀態,進入parkedWorkersStack後,
    // 下一個處於PARKING狀態並進入parkedWorkersStack的執行緒的參照
    @Volatile
    var nextParkedWorker: Any? = NOT_IN_STACK
    // 偷取其他執行緒的本地佇列的任務的冷卻時間,後面會解釋
    private var minDelayUntilStealableTaskNs = 0L
    // 生成亂數,配合演演算法,用於任務尋找
    private var rngState = Random.nextInt()
    ...
    // 表示當前執行緒的本地佇列是否有任務
    @JvmField
    var mayHaveLocalTasks = false
    ...
}

3.Worker類的run方法

接下來分析Worker類的核心方法——run方法的實現,程式碼入下:

override fun run() = runWorker()
private fun runWorker() {
    // 用於配合minDelayUntilStealableTaskNs自旋
    var rescanned = false
    // 執行緒池未關閉,執行緒沒有終止,則迴圈
    while (!isTerminated && state != WorkerState.TERMINATED) {
        // 尋找並獲取任務
        val task = findTask(mayHaveLocalTasks)
        // 如果找到了任務
        if (task != null) {
            // 重製兩個變數
            rescanned = false
            minDelayUntilStealableTaskNs = 0L
            // 執行任務
            executeTask(task)
            // 繼續迴圈
            continue
        } else { // 如果沒有找到任務,說明本地佇列肯定沒有任務,因為本地佇列優先查詢
            // 設定標誌位
            mayHaveLocalTasks = false
        }
        // 走到這裡,說明沒有找到任務
        // 如果偷取任務的冷卻時間不為0,說明之前偷到過任務
        if (minDelayUntilStealableTaskNs != 0L) {
            // 這裡通過rescanned,首次minDelayUntilStealableTaskNs不為0,
            // 不會立刻進入PARKING狀態,而是再次去尋找任務
            // 因為當過多的執行緒進入PARKING狀態,再次喚起大量的執行緒很難控制
            if (!rescanned) {
                rescanned = true
            } else {// 再次掃描,仍然沒有找到任務
                // 置位
                rescanned = false
                // 嘗試釋放CPU令牌,並進入WorkerState.PARKING狀態
                tryReleaseCpu(WorkerState.PARKING)
                // 清除中斷標誌位
                interrupted()
                // 阻塞minDelayUntilStealableTaskNs毫秒
                LockSupport.parkNanos(minDelayUntilStealableTaskNs)
                // 清零
                minDelayUntilStealableTaskNs = 0L
            }
            // 阻塞完成後繼續執行
            continue
        }
        // 走到這裡,說明執行緒可能很長時間都沒有執行任務了,則對其進行暫停處理
        // tryPark比tryReleaseCpu要嚴格的多,會被執行緒會被計入到parkedWorkersStack,
        // 同時會修改workerCtl狀態
        tryPark()
    }
    // 退出迴圈
    // 嘗試釋放CPU令牌,並進入終止狀態
    tryReleaseCpu(WorkerState.TERMINATED)
}

4.Worker類的任務尋找機制

接下來分析Worker執行緒如何尋找任務,程式碼如下:

// 尋找任務
fun findTask(scanLocalQueue: Boolean): Task? {
    // 嘗試獲取CPU令牌,如果獲取到了,則呼叫findAnyTask方法,尋找任務
    if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
    // 如果沒有獲取到CPU令牌,只能去找非純CPU任務了
    // 如果允許掃描原生的任務佇列,則優先在本地佇列中尋找,
    // 找不到則在全域性佇列中尋找,從隊首中獲取
    val task = if (scanLocalQueue) {
        localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
    } else {
        globalBlockingQueue.removeFirstOrNull()
    }
    // 如果在本地佇列和全域性佇列中都找不到,則嘗試去其他執行緒的佇列裡偷一個任務
    return task ?: trySteal(blockingOnly = true)
}
// 尋找CPU任務
private fun findAnyTask(scanLocalQueue: Boolean): Task? {
    // 如果允許掃描原生的任務佇列,則在本地佇列和全域性佇列中隨機二選一,
    // 找不到則在全域性佇列中尋找,從隊首中獲取
    if (scanLocalQueue) {
        // 隨機確定本地佇列和全域性佇列的優先順序
        val globalFirst = nextInt(2 * corePoolSize) == 0
        // 獲取任務
        if (globalFirst) pollGlobalQueues()?.let { return it }
        localQueue.poll()?.let { return it }
        if (!globalFirst) pollGlobalQueues()?.let { return it }
    } else {
        // 只能從全域性獲取
        pollGlobalQueues()?.let { return it }
    }
    // 走到這裡,說明本地佇列和全域性佇列中都找不到
    // 那麼就嘗試去其他執行緒的佇列裡偷一個任務
    return trySteal(blockingOnly = false)
}
// 從全域性佇列獲取任務
private fun pollGlobalQueues(): Task? {
    // 隨機獲取CPU任務或者非CPU任務
    if (nextInt(2) == 0) {
        // 優先獲取CPU任務
        globalCpuQueue.removeFirstOrNull()?.let { return it }
        return globalBlockingQueue.removeFirstOrNull()
    } else {
        // 優先獲取非CPU任務
        globalBlockingQueue.removeFirstOrNull()?.let { return it }
        return globalCpuQueue.removeFirstOrNull()
    }
}
// 偷取其他執行緒的本地佇列的任務
// blockingOnly表示是否只偷取阻塞任務
private fun trySteal(blockingOnly: Boolean): Task? {
    // 只有當前執行緒的本地佇列為空的時候,才能偷其他執行緒的本地佇列
    assert { localQueue.size == 0 }
    // 獲取已經存在的執行緒的數量
    val created = createdWorkers
    // 如果執行緒總數為0或1,則不偷取,直接返回
    // 0:需要等待初始化
    // 1:避免在單執行緒機器上過度偷取
    if (created < 2) {
        return null
    }
    // 隨機生成一個存在的執行緒索引
    var currentIndex = nextInt(created)
    // 預設的偷取冷卻時間
    var minDelay = Long.MAX_VALUE
    // 迴圈遍歷
    repeat(created) {
        // 每次迴圈索引自增,帶著下一行程式碼錶示,從位置currentIndex開始偷
        ++currentIndex
        // 如果超出了,則從頭繼續
        if (currentIndex > created) currentIndex = 1
        // 從陣列中獲取執行緒
        val worker = workers[currentIndex]
        // 如果執行緒不為空,並且不是自己
        if (worker !== null && worker !== this) {   
            assert { localQueue.size == 0 }
            // 根據偷取的型別進行偷取
            val stealResult = if (blockingOnly) {
                // 偷取非CPU任務到本地佇列中
                localQueue.tryStealBlockingFrom(victim = worker.localQueue)
            } else {
                // 偷取任務到本地佇列中
                localQueue.tryStealFrom(victim = worker.localQueue)
            }
            // 如果返回值為TASK_STOLEN,說明偷到了
            // 如果返回值為NOTHING_TO_STEAL,說明要偷的執行緒的本地佇列是空的
            if (stealResult == TASK_STOLEN) {
                // 從佇列的隊首拿出來返回
                return localQueue.poll()
            // 如果返回值大於零,表示偷取的冷卻時間,說明沒有偷到  
            } else if (stealResult > 0) { // 說明至少還要等待stealResult時間才能偷取這個任務
                // 計算偷取冷卻時間
                minDelay = min(minDelay, stealResult)
            }
        }
    }
    // 設定偷取等待時間
    minDelayUntilStealableTaskNs = if (minDelay != Long.MAX_VALUE) minDelay else 0
    // 返回空
    return null
}
// 基於Marsaglia xorshift RNG演演算法
// 用於在2^32-1範圍內計算偷取目標
internal fun nextInt(upperBound: Int): Int {
    var r = rngState
    r = r xor (r shl 13)
    r = r xor (r shr 17)
    r = r xor (r shl 5)
    rngState = r
    val mask = upperBound - 1
    // Fast path for power of two bound
    if (mask and upperBound == 0) {
        return r and mask
    }
    return (r and Int.MAX_VALUE) % upperBound
}

通過對這部分程式碼的分析,可以知道執行緒在尋找任務時,首先會嘗試獲取CPU令牌,成為核心執行緒。如果執行緒成為了核心執行緒,則隨機從本地或全域性的兩個佇列中獲取一個任務,獲取不到則去隨機偷取一個任務。如果沒有獲取到CPU令牌,則優先在本地獲取任務,獲取不到則在全域性非CPU任務佇列中獲取任務,獲取不到則去偷取一個非CPU任務。

如果偷取的任務沒有達到最小的可偷取時間,則返回需要等待的時間。如果偷取任務成功,則直接加入到本地佇列中。偷取的核心過程,會在後面進行分析。

5.Worker類的任務執行機制

接下來分析任務被獲取到後如何被執行,程式碼如下:

// 執行任務
private fun executeTask(task: Task) {
    // 獲取任務型別,型別為純CPU或可能阻塞
    val taskMode = task.mode
    // 重置執行緒閒置狀態
    idleReset(taskMode)
    // 任務執行前
    beforeTask(taskMode)
    // 執行任務
    runSafely(task)
    // 任務執行後
    afterTask(taskMode)
}
// 重置執行緒閒置狀態
private fun idleReset(mode: Int) {
    // 重置從PARKING狀態到TERMINATED狀態的時間
    terminationDeadline = 0L
    // 如果當前狀態為PARKING,說明尋找任務時沒有獲取到CPU令牌
    if (state == WorkerState.PARKING) {
        assert { mode == TASK_PROBABLY_BLOCKING }
        // 設定狀態為BLOCKING
        state = WorkerState.BLOCKING
    }
}
// 任務執行前
private fun beforeTask(taskMode: Int) {
    // 如果執行的任務為純CPU任務,說明當前執行緒獲取到了CPU令牌,是核心執行緒,直接返回
    if (taskMode == TASK_NON_BLOCKING) return
    // 走到這裡,說明執行緒執行的是非純CPU任務,
    // 沒有CPU令牌也可以執行,因此嘗試釋放CPU令牌,進入WorkerState.BLOCKING
    if (tryReleaseCpu(WorkerState.BLOCKING)) {
        // 如果釋放CPU令牌成功,則喚起一個執行緒去申請CPU令牌
        signalCpuWork()
    }
}
// 執行任務
fun runSafely(task: Task) {
    try {
        task.run()
    } catch (e: Throwable) {
        // 異常發生時,通知當前執行緒的例外處理Handler
        val thread = Thread.currentThread()
        thread.uncaughtExceptionHandler.uncaughtException(thread, e)
    } finally {
        unTrackTask()
    }
}
// 任務執行後
private fun afterTask(taskMode: Int) {
    // 如果執行的任務為純CPU任務,說明當前執行緒獲取到了CPU令牌,是核心執行緒,直接返回
    if (taskMode == TASK_NON_BLOCKING) return
    // 如果執行的是非CPU任務
    // 當前執行的非CPU任務數量減一
    decrementBlockingTasks()
    // 獲取當前執行緒狀態
    val currentState = state
    // 如果執行緒當前不是終止狀態
    if (currentState !== WorkerState.TERMINATED) {
        assert { currentState == WorkerState.BLOCKING }
        // 設定為休眠狀態
        state = WorkerState.DORMANT
    }
}

四.CoroutineScheduler類的dispatch方法

瞭解Worker類的工作機制後,接下來分析CoroutineScheduler類的dispatch方法,程式碼如下:

// block表示要執行的任務
// taskContext表示任務執行的上下文,裡面包含任務的型別,和執行完成後的回撥
// tailDispatch表示當前任務是否進行佇列尾部排程,
// 當tailDispatch為true時,當前block會在當前執行緒的本地佇列裡的任務全部執行完後再執行
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
    // 上報時間,TimeSource相關,無需關注
    trackTask()
    // 建立任務
    val task = createTask(block, taskContext)
    // 獲取當前的Worker,可能獲取不到
    val currentWorker = currentWorker()
    // 將當前的任務新增到當前執行緒的本地佇列中
    val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
    // 不為空,說明沒有新增進去,說明當前的執行緒不是Worker
    if (notAdded != null) {
         // 將任務新增到全域性佇列中,如果新增失敗了
        if (!addToGlobalQueue(notAdded)) {
            // 說明執行緒池正在關閉,丟擲異常
            throw RejectedExecutionException("$schedulerName was terminated")
        }
    }
    // skipUnpark表示是否跳過喚起狀態,取決於這下面兩個引數
    val skipUnpark = tailDispatch && currentWorker != null
    // 如果當前型別為純CPU任務
    if (task.mode == TASK_NON_BLOCKING) {
        // 如果跳過喚醒,則直接返回
        if (skipUnpark) return
        // 喚醒一個執行純CPU任務的執行緒
        signalCpuWork()
    } else {
        // 喚醒一個執行非CPU任務的執行緒
        signalBlockingWork(skipUnpark = skipUnpark)
    }
}
// 建立任務
internal fun createTask(block: Runnable, taskContext: TaskContext): Task {
    // 獲取當前時間
    val nanoTime = schedulerTimeSource.nanoTime()
    // 如果當前的block是Task型別的
    if (block is Task) {
        // 重新設定提交時間和任務上下文
        block.submissionTime = nanoTime
        block.taskContext = taskContext
        // 返回
        return block
    }
    // 封裝成TaskImpl,返回
    return TaskImpl(block, nanoTime, taskContext)
}
// 任務模型
// block表示執行的任務
// submissionTime表示任務提交時間
// taskContext表示任務執行的上下文
internal class TaskImpl(
    @JvmField val block: Runnable,
    submissionTime: Long,
    taskContext: TaskContext
) : Task(submissionTime, taskContext) {
    override fun run() {
        try {
            block.run()
        } finally {
            // 任務執行完畢後,會在同一個Worker執行緒中回撥afterTask方法
            taskContext.afterTask()
        }
    }
    override fun toString(): String =
        "Task[${block.classSimpleName}@${block.hexAddress}, $submissionTime, $taskContext]"
}
// 將任務新增到本地佇列
private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
    // 如果當前執行緒為空,則返回任務
    if (this == null) return task
    // 如果執行緒處於終止狀態,則返回任務
    if (state === WorkerState.TERMINATED) return task
    // 如果任務為純CPU任務,但是執行緒沒有CPU令牌
    if (task.mode == TASK_NON_BLOCKING && state === WorkerState.BLOCKING) {
        // 則返回任務
        return task
    }
    // 標記本地佇列有任務
    mayHaveLocalTasks = true
    // 新增到佇列
    return localQueue.add(task, fair = tailDispatch)
}
// 新增到全域性佇列
private fun addToGlobalQueue(task: Task): Boolean {
    // 根據任務的型別,新增到全域性佇列的隊尾
    return if (task.isBlocking) {
        globalBlockingQueue.addLast(task)
    } else {
        globalCpuQueue.addLast(task)
    }
}
// 對當前執行緒進行強制轉換,如果排程器也是當前的排程器則返回Worker物件
private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
// 喚起一個執行非純CPU任務的執行緒
private fun signalBlockingWork(skipUnpark: Boolean) {
    // 當前執行阻塞任務的執行緒數量加1,並獲取當前的控制狀態
    val stateSnapshot = incrementBlockingTasks()
    // 如果跳過喚起,則返回
    if (skipUnpark) return
    // 嘗試喚起,喚起成功,則返回
    if (tryUnpark()) return
    // 喚起失敗,則根據當前的控制狀態,嘗試建立新執行緒,成功則返回
    if (tryCreateWorker(stateSnapshot)) return
    // 再次嘗試喚起,防止多執行緒競爭情況下,上面的tryUnpark方法正好卡線上程釋放CPU令牌與進入PARKING狀態之間
    // 因為執行緒先釋放CPU令牌,後進入PARKING狀態
    tryUnpark()
}
// 喚起一個執行純CPU任務的執行緒
internal fun signalCpuWork() {
    // 嘗試喚起,喚起成功,則返回
    if (tryUnpark()) return
    // 喚起失敗,則嘗試建立新執行緒,成功則返回
    if (tryCreateWorker()) return
    // 再次嘗試喚起,防止多執行緒競爭情況下,上面的tryUnpark方法正好卡線上程釋放CPU令牌與進入PARKING狀態之間
    // 因為執行緒先釋放CPU令牌,後進入PARKING狀態
    tryUnpark()
}

通過對上面的程式碼進行分析,可以知道CoroutineScheduler類的dispatch方法,首先會對任務進行封裝。正常情況下,任務都會根據型別新增到全域性佇列中,接著根據任務型別,隨機喚起一個執行對應型別任務的執行緒去執行任務。

當任務執行完畢後,會回撥任務中自帶的afterTask方法。根據之前對LimitingDispatcher的分析,可以知道,此時tailDispatch引數為true,同時當前的執行緒也是Worker執行緒,因此會被直接新增到執行緒的本地佇列中,由於任務有對應的執行緒執行,因此跳過了喚起其他執行緒執行任務的階段。這裡我們可以稱這個機制為尾調機制。

為什麼CoroutineScheduler類中要設計一個尾調機制呢?

在傳統的執行緒池的執行緒充足情況下,一個任務到來時,會被分配一個執行緒。假設前後兩個任務A與B有依賴關係,需要在執行A再執行B,這時如果兩個任務同時到來,執行A任務的執行緒會直接執行,而執行B執行緒的任務可能需要被阻塞。而一旦執行緒阻塞會造成執行緒資源的浪費。而協程本質上就是多個小段程式的相互共同作業,因此這種場景會非常多,通過這種機制可以保證任務的執行順序,同時減少資源浪費,而且可以最大限度的保證一個連續的任務執行在同一個執行緒中。

至此,Dispatchers.IO執行緒池的工作原理全部分析完畢。

五.淺談WorkQueue類

1.add方法

接下來分析一些更加細節的過程。首先分析一下Worker執行緒本地佇列呼叫的add方法是如何新增任務的,程式碼如下:

// 本地佇列中儲存最後一次尾調的任務
private val lastScheduledTask = atomic<Task?>(null)
// fair表示是否公平的執行任務,FIFO,預設為false
fun add(task: Task, fair: Boolean = false): Task? {
    // fair為true,則新增到隊尾
    if (fair) return addLast(task)
    // 如果fair為false,則從lastScheduledTask中取出上一個尾調的任務,
    // 並把這次的新尾調任務儲存到lastScheduledTask
    val previous = lastScheduledTask.getAndSet(task) ?: return null
    // 如果獲取上一次的尾調任務不為空,則新增到隊尾
    return addLast(previous)
}

2.任務偷取機制

根據之前對Worker類的分析,任務偷取的核心程式碼鎖定在了WorkQueue類的兩個方法上:一個是偷取非純CPU任務的tryStealBlockingFrom方法,另一個可以偷所有型別任務的tryStealFrom方法,程式碼如下:

internal const val BUFFER_CAPACITY_BASE = 7
internal const val BUFFER_CAPACITY = 1 shl BUFFER_CAPACITY_BASE // 1000 0000
internal const val MASK = BUFFER_CAPACITY - 1 // 0111 1111
// 儲存任務的陣列,最多儲存128
private val buffer: AtomicReferenceArray<Task?> = AtomicReferenceArray(BUFFER_CAPACITY)
// producerIndex表示上一次向任務陣列中新增任務的索引
// consumerIndex表示上一次消費的任務索引
// producerIndex永遠大於等於consumerIndex
// 二者差值就是當前任務陣列中任務的數量
private val producerIndex = atomic(0)
private val consumerIndex = atomic(0)
// buffer中非純CPU任務的數量(避免遍歷掃描)
private val blockingTasksInBuffer = atomic(0)
// 偷所有型別任務
fun tryStealFrom(victim: WorkQueue): Long {
    assert { bufferSize == 0 }
    // 從要偷取執行緒的本地佇列中輪訓獲取一個任務
    val task  = victim.pollBuffer()
    // 如果獲取到了任務
    if (task != null) {
        // 將它新增到自己的本地佇列中
        val notAdded = add(task)
        assert { notAdded == null }
        // 返回偷取成功的標識
        return TASK_STOLEN
    }
    // 如果偷取失敗,嘗試偷取指定執行緒的尾調任務
    return tryStealLastScheduled(victim, blockingOnly = false)
}
// 輪訓獲取任務
private fun pollBuffer(): Task? {
    // 死迴圈
    while (true) {
        // 獲取上一次消費的任務索引
        val tailLocal = consumerIndex.value
        // 如果當前任務陣列中沒有多處的任務,則返回空
        if (tailLocal - producerIndex.value == 0) return null
        // 計算偷取位置,防止陣列過界
        val index = tailLocal and MASK
        // 通過CAS方式,將consumerIndex加一,表示下一次要從tailLocal + 1處開始偷取
        if (consumerIndex.compareAndSet(tailLocal, tailLocal + 1)) {
            // 從偷取位置初取出任務,如果偷取的任務為空,則繼續迴圈
            val value = buffer.getAndSet(index, null) ?: continue
            // 偷取成功
            // 若任務為阻塞任務,blockingTasksInBuffer的值減一
            value.decrementIfBlocking()
            // 返回任務
            return value
        }
    }
}
// 偷取非純CPU任務
fun tryStealBlockingFrom(victim: WorkQueue): Long {
    assert { bufferSize == 0 }
    // 從consumerIndex位置開始偷
    var start = victim.consumerIndex.value
    // 偷到producerIndex處截止
    val end = victim.producerIndex.value
    // 獲取任務陣列
    val buffer = victim.buffer
    // 迴圈偷取
    while (start != end) {
        // 計算偷取位置,防止陣列過界
        val index = start and MASK
        // 如果非純CPU任務數為0,則直接退出迴圈
        if (victim.blockingTasksInBuffer.value == 0) break
        // 獲取index處的任務
        val value = buffer[index]
        // 如果任務存在,而且是非純CPU任務,同時成功的通過CAS設定為空
        if (value != null && value.isBlocking && buffer.compareAndSet(index, value, null)) {
            // blockingTasksInBuffer的值減一
            victim.blockingTasksInBuffer.decrementAndGet()
            // 將偷取的任務新增到當前執行緒的本地佇列中
            add(value)
            // 返回偷取成功標識
            return TASK_STOLEN
        } else {
            // 如果偷取失敗,自增再次迴圈,從下一個位置開始偷
            ++start
        }
    }
    // 如果從任務陣列中偷取失敗,嘗試偷取指定執行緒的尾調任務
    return tryStealLastScheduled(victim, blockingOnly = true)
}
// 偷取指定執行緒的尾調任務
private fun tryStealLastScheduled(victim: WorkQueue, blockingOnly: Boolean): Long {
    // 死迴圈
    while (true) {
        // 獲取指定執行緒的尾調任務,如果任務不存在,則返回偷取失敗識別符號
        val lastScheduled = victim.lastScheduledTask.value ?: return NOTHING_TO_STEAL
        // 如果要偷取的是非純CPU任務,但是任務型別為純CPU任務,說明只有核心執行緒才能偷
        // 返回偷取失敗識別符號
        if (blockingOnly && !lastScheduled.isBlocking) return NOTHING_TO_STEAL
        // 獲取當前時間
        val time = schedulerTimeSource.nanoTime()
        //計算任務從新增開始到現在經過的時長
        val staleness = time - lastScheduled.submissionTime
        // 如果時長小於偷取冷卻時間
        if (staleness < WORK_STEALING_TIME_RESOLUTION_NS) {
            // 返回當前執行緒需要等待的時間
            return WORK_STEALING_TIME_RESOLUTION_NS - staleness
        }
        // 通過CAS,將lastScheduledTask設定為空,防止被其他執行緒執行
        if (victim.lastScheduledTask.compareAndSet(lastScheduled, null)) {
            // 偷取成功,加入到當前執行緒的佇列中
            add(lastScheduled)
            // 返回偷取成功表示
            return TASK_STOLEN
        }
        // 繼續迴圈
        continue
    }
}
// 偷取冷卻時間,尾調任務從新增開始,
// 最少經過WORK_STEALING_TIME_RESOLUTION_NS時間才可以被偷
@JvmField
internal val WORK_STEALING_TIME_RESOLUTION_NS = systemProp(
    "kotlinx.coroutines.scheduler.resolution.ns", 100000L
)

六.總結

1.兩個執行緒池

CoroutineScheduler類是核心的執行緒池,用於任務的執行。LimitingDispatcher類對CoroutineScheduler類進行代理,是CoroutineScheduler類尾調機制的使用者,對任務進行初步排隊。

2.四種佇列

LimitingDispatcher類中的任務佇列。CoroutineScheduler類中的兩個全域性佇列。Worker類中的本地佇列。

3.尾調機制

一個任務執行完,可以通過回撥,在同一個Worker執行緒中再儲存一個待執行任務,該任務將在Worker執行緒本地佇列目前已存在的任務,執行完畢後再執行。

4.任務分類與許可權控制

所有任務分成純CPU任務和非純CPU任務兩種,對應著核心執行緒和非核心執行緒。

所有執行緒在執行前都先嚐試成為核心執行緒,核心執行緒可以從兩種任務中任意選擇執行,非核心執行緒只能執行非純CPU任務。核心執行緒如果選擇執行非純CPU任務會變成非核心執行緒

5.任務偷取機制

WorkQueue類根據隨機演演算法提供任務偷取機制,一個Worker執行緒可以從其他Worker執行緒的本地佇列中偷取任務。

6.執行梳理圖

到此這篇關於Android Dispatchers.IO執行緒池深入刨析的文章就介紹到這了,更多相關Android Dispatchers.IO內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com