首頁 > 軟體

Kotlin圖文並茂講解續體與續體攔截器和排程器

2022-08-01 18:05:42

一.Continuation

Continuation介面是協程中最核心的介面,代表著掛起點之後的續體,程式碼如下:

public interface Continuation<in T> {
    // 續體的上下文
    public val context: CoroutineContext
    // 該方法用於恢復續體的執行
    // result為掛起點執行完成的返回值,T為返回值的型別
    public fun resumeWith(result: Result<T>)
}

Continuation圖解

二.ContinuationInterceptor

ContinuationInterceptor介面繼承自Element介面,是協程中的續體攔截器,程式碼如下:

public interface ContinuationInterceptor : CoroutineContext.Element {
    // 攔截器的Key
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>
    // 攔截器對續體進行攔截時會呼叫該方法,並對continuation進行快取
    // 攔截判斷:根據傳入的continuation物件與返回的continuation物件是否相同
    public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
    // 當interceptContinuation方法攔截的協程執行完畢後,會呼叫該方法
    public fun releaseInterceptedContinuation(continuation: Continuation<*>) {
        /* do nothing by default */
    }
    // get方法多型實現
    public override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? {
        @OptIn(ExperimentalStdlibApi::class)
        if (key is AbstractCoroutineContextKey<*, *>) {
            @Suppress("UNCHECKED_CAST")
            return if (key.isSubKey(this.key)) key.tryCast(this) as? E else null
        }
        @Suppress("UNCHECKED_CAST")
        return if (ContinuationInterceptor === key) this as E else null
    }
    // minusKey方法多型實現
    public override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext {
        @OptIn(ExperimentalStdlibApi::class)
        if (key is AbstractCoroutineContextKey<*, *>) {
            return if (key.isSubKey(this.key) && key.tryCast(this) != null) EmptyCoroutineContext else this
        }
        return if (ContinuationInterceptor === key) EmptyCoroutineContext else this
    }
}

三.CoroutineDispatcher

CoroutineDispatcher類繼承自AbstractCoroutineContextElement類,實現了ContinuationInterceptor介面,是協程排程器的基礎類別,程式碼如下:

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    // ContinuationInterceptor的多型實現,排程器本質上就是攔截器
    @ExperimentalStdlibApi
    public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
        ContinuationInterceptor,
        { it as? CoroutineDispatcher })
    // 用於判斷排程器是否要呼叫dispatch方法進行排程,預設為true
    public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
    // 排程的核心方法,在這裡進行排程,執行block
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
    // 如果排程是由Yield方法觸發的,預設通過dispatch方法實現
    @InternalCoroutinesApi
    public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
    // ContinuationInterceptor介面的方法,將續體包裹成DispatchedContinuation,並傳入當前排程器
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
    // 釋放父協程與子協程的關聯。
    @InternalCoroutinesApi
    public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
        (continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
    }
    // 過載了"+"操作,直接返回others
    // 因為兩個排程器相加沒有意義,同一個上下文中只能有一個排程器
    // 如果需要加的是排程器物件,則直接替換成最新的,因此直接返回
    public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other
    override fun toString(): String = "$classSimpleName@$hexAddress"
}

四.EventLoop

EventLoop類繼承自CoroutineDispatcher類,用於協程中任務的分發執行,只在runBlocking方法中和Dispatchers.Unconfined排程器中使用。與Handler中的Looper類似,在建立後會儲存在當前執行緒的ThreadLocal中。EventLoop本身不支援延時執行任務,如果需要可以自行繼承EventLoop並實現Delay介面,EventLoop中預留了一部分變數和方法用於延時需求的擴充套件。

為什麼協程需要EventLoop呢?協程的本質是續體傳遞,而續體傳遞的本質是回撥,假設在Dispatchers.Unconfined排程下,要連續執行多個suspend方法,就會有多個續體傳遞,假設suspend方法達到一定數量後,就會造成StackOverflow,進而引起崩潰。同樣的,我們知道呼叫runBlocking會阻塞當前執行緒,而runBlocking阻塞的原理就是執行“死迴圈”,因此需要在迴圈中做任務的分發,去執行內部協程在Dispatchers.Unconfined排程器下加入的任務。

EventLoop程式碼如下:

internal abstract class EventLoop : CoroutineDispatcher() {
    // 用於記錄使用當前EventLoop的runBlocking方法和Dispatchers.Unconfined排程器的數量
    private var useCount = 0L
    // 表示當前的EventLoop是否被暴露給其他的執行緒
    // runBlocking會將EventLoop暴露給其他執行緒
    // 因此,當runBlocking使用時,shared必須為true
    private var shared = false
    // Dispatchers.Unconfined排程器的任務執行佇列
    private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
    // 處理任務佇列的下一個任務,該方法只能在EventLoop所在的執行緒呼叫
    // 返回值<=0,說明立刻執行下一個任務
    // 返回值>0,說明等待這段時間後,執行下一個任務
    // 返回值為Long.MAX_VALUE,說明佇列裡沒有任務了 
    public open fun processNextEvent(): Long {
        if (!processUnconfinedEvent()) return Long.MAX_VALUE
        return 0
    }
    // 佇列是否為空
    protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
    // 下一個任務多長時間後執行
    protected open val nextTime: Long
        get() {
            val queue = unconfinedQueue ?: return Long.MAX_VALUE
            return if (queue.isEmpty) Long.MAX_VALUE else 0L
        }
    // 任務的核心處理方法
    public fun processUnconfinedEvent(): Boolean {
        // 若佇列為空,則返回
        val queue = unconfinedQueue ?: return false
        // 從隊首取出一個任務,如果為空,則返回
        val task = queue.removeFirstOrNull() ?: return false
        // 執行
        task.run()
        return true
    }
    // 表示當前EventLoop是否可以在協程上下文中被呼叫
    // EventLoop本質上也是協程上下文
    // 如果EventLoop在runBlocking方法中使用,必須返回true
    public open fun shouldBeProcessedFromContext(): Boolean = false
    // 向佇列中新增一個任務
    public fun dispatchUnconfined(task: DispatchedTask<*>) {
        // 若佇列為空,則建立一個新的佇列
        val queue = unconfinedQueue ?:
            ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it }
        queue.addLast(task)
    }
    // EventLoop當前是否還在被使用
    public val isActive: Boolean
        get() = useCount > 0
    // EventLoop當前是否還在被Unconfined排程器使用
    public val isUnconfinedLoopActive: Boolean
        get() = useCount >= delta(unconfined = true)
    // 判斷佇列是否為空
    public val isUnconfinedQueueEmpty: Boolean
        get() = unconfinedQueue?.isEmpty ?: true
    // 下面三個方法用於計算使用當前的EventLoop的runBlocking方法和Unconfined排程器的數量
    // useCount是一個64位元的數,
    // 它的高32位元用於記錄Unconfined排程器的數量,低32位元用於記錄runBlocking方法的數量
    private fun delta(unconfined: Boolean) =
        if (unconfined) (1L shl 32) else 1L
    fun incrementUseCount(unconfined: Boolean = false) {
        useCount += delta(unconfined)
        // runBlocking中使用,shared為true
        if (!unconfined) shared = true 
    }
    fun decrementUseCount(unconfined: Boolean = false) {
        useCount -= delta(unconfined)
        // 如果EventLoop還在被使用
        if (useCount > 0) return
        assert { useCount == 0L }
        // 如果EventLoop不被使用了,並且在EventLoop中使用過
        if (shared) {
            // 關閉相關資源,並在ThreadLocal中移除
            shutdown()
        }
    }
    protected open fun shutdown() {}
}

協程中提供了EventLoopImplBase類,間接繼承自EventLoop,實現了Delay介面,用來延時執行任務。同時,協程中還提供單例物件ThreadLocalEventLoop用於EventLoop在ThreadLocal中的儲存。

到此這篇關於Kotlin圖文並茂講解續體與續體攔截器和排程器的文章就介紹到這了,更多相關Kotlin續體內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com