首頁 > 軟體

Kotlin協程操作之建立啟動掛起恢復詳解

2022-08-01 18:05:38

下面以launch方法為例進行分析。

一.協程的建立

launch方法的程式碼如下:

// CoroutineScope的擴充套件方法
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    // 根據當前上下文,計算得到新的上下文
    val newContext = newCoroutineContext(context)
    // 根據啟動模式,建立不同的續體
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    // 啟動協程
    coroutine.start(start, coroutine, block)
    return coroutine
}

newCoroutineContext用於計算新的上下文,程式碼如下:

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    // coroutineContext為CoroutineScope中儲存的全域性變數
    // 對上下文進行相加
    val combined = coroutineContext + context
    // 用於debug
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    // 如果上下文中沒有排程器,則新增一個預設的排程器
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default else debug
}

1.start方法

在不指定協程啟動模式的情況下,協程將按照DEFAULT模式啟動,在上述程式碼中,會呼叫StandaloneCoroutine物件的start方法。StandaloneCoroutine的程式碼如下:

private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}

StandaloneCoroutine類中僅重寫了handleJobException方法,用於處理父協程不處理的異常。因此這裡呼叫的start方法實際是父類別AbstractCoroutine的方法,AbstractCoroutine類的start方法程式碼如下:

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
    // 該方法用於完成父協程與子協程的繫結關聯,同時確保父協程啟動
    initParentJob()
    // 該方法的寫法等同於start.invoke(block, receiver, this)
    // 因此呼叫的CoroutineStart類的方法
    start(block, receiver, this)
}

AbstractCoroutine類的start方法內,呼叫了CoroutineStart類的invoke方法。

2.CoroutineStart類

CoroutineStart是一個列舉類,用於根據不同的啟動模式去啟動協程,程式碼如下:

public enum class CoroutineStart {
    // 四種啟動模式
    DEFAULT,
    LAZY,
    // 具有實驗性,慎用
    @ExperimentalCoroutinesApi
    ATOMIC,
    // 具有實驗性,慎用
    @ExperimentalCoroutinesApi
    UNDISPATCHED;
    // 根據不同的啟動策略,啟動協程,執行block
    @InternalCoroutinesApi
    public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(completion)
            ATOMIC -> block.startCoroutine(completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(completion)
            LAZY -> Unit // 該模式不主動啟動,等待使用者呼叫start方法
        }
    // 根據不同的啟動策略,啟動協程,執行block
    @InternalCoroutinesApi
    public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            ATOMIC -> block.startCoroutine(receiver, completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            LAZY -> Unit
        }
    // 當前的啟動模式是否為懶啟動
    @InternalCoroutinesApi
    public val isLazy: Boolean get() = this === LAZY
}

CoroutineStart類中有兩個invoke方法,其中一個引數中有receiver,另一個沒有receiver。在Kotlin協程中,很多方法都過載了帶有receiver的方法和不帶有receiver的方法。

receiver用於為block執行提供一個環境。Kotlin中提供的啟動協程的方法都是通過帶receiver引數的start方法實現。通過receiver環境,可以更方便的實現一些操作,比如在launch啟動的協程中再次呼叫launch啟動新的協程。在沒有receiver的環境下執行block,則更像是在suspend方法中執行,如果需要啟動其他的協程,需要自己提供環境。

3.startCoroutineCancellable方法

startCoroutineCancellable是一個擴充套件方法,用來建立一個可以取消的協程,程式碼如下:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        // createCoroutineUnintercepted:建立協程
        // intercepted:攔截排程
        // resumeCancellableWith:恢復執行
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
    }
// 如果建立的過程發生異常,則通知續體恢復後續程式碼的執行
private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) {
    try {
        block()
    } catch (e: Throwable) {
        completion.resumeWith(Result.failure(e))
    }
}

4.createCoroutineUnintercepted方法

createCoroutineUnintercepted方法用於建立一個新的、可掛起的、不受干擾的協程。

public expect fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit>

在Kotlin中有很多被expect關鍵字標記的介面方法,需要找到對應平臺下被actual標記的實現方法。

public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
    // 用於debug
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(receiver, probeCompletion)
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}

createCoroutineUnintercepted方法建立的協程需要手動呼叫resumeWith方法才可以啟動,但重複的呼叫resumeWith方法可能會導致狀態機發生異常。同時,引數中傳入的completion可能會在任意的上下文中被呼叫。

正常情況下,我們編寫的lambda表示式——block,在編譯器編譯時,會自動生成一個類,並繼承SuspendLambda類,實現Continuation等介面。因為SuspendLambda繼承自ContinuationImpl,ContinuationImpl繼承自BaseContinuationImpl,所以才有了上述程式碼中的判斷邏輯。

如果當前的block物件的型別為BaseContinuationImpl,則呼叫create方法,這裡的create方法是編譯器生成的類裡的重寫方法,它的內部就是通過我們傳入的引數,建立並返回根據blcok生成的類的一個範例物件。

如果當前的block物件的型別不為BaseContinuationImpl,則需要通過createCoroutineFromSuspendFunction方法建立協程。這裡假設lambda表示式的型別不是BaseContinuationImpl。

5.createCoroutineFromSuspendFunction方法

該方法用於在createCoroutineUnintercepted方法中使用,當一個被suspend修飾的lambda表示式沒有繼承BaseContinuationImpl類時,則通過此方法建立協程。

有兩種情況會呼叫該方法建立協程:第一種情況是lambda表示式中呼叫了其他的掛起方法;第二種情況是掛起方法是通過Java實現的。

createCoroutineFromSuspendFunction方法的程式碼如下:

private inline fun <T> createCoroutineFromSuspendFunction(
    completion: Continuation<T>,
    crossinline block: (Continuation<T>) -> Any?
): Continuation<Unit> {
    val context = completion.context
    // 如果上下文為空
    return if (context === EmptyCoroutineContext)
        // 建立一個受限協程
        object : RestrictedContinuationImpl(completion as Continuation<Any?>) {
            private var label = 0

            override fun invokeSuspend(result: Result<Any?>): Any? =
                when (label) {
                    0 -> {
                        label = 1
                        result.getOrThrow()
                        block(this)
                    }
                    1 -> {
                        label = 2
                        result.getOrThrow()
                    }
                    else -> error("This coroutine had already completed")
                }
        }
    else // 不為空,則建立一個正常的協程
        object : ContinuationImpl(completion as Continuation<Any?>, context) {
            private var label = 0

            override fun invokeSuspend(result: Result<Any?>): Any? =
                when (label) {
                    0 -> {
                        label = 1
                        result.getOrThrow()
                        block(this)
                    }
                    1 -> {
                        label = 2
                        result.getOrThrow()
                    }
                    else -> error("This coroutine had already completed")
                }
        }
}

受限協程是指協程在執行過程中的,只能呼叫協程作用域中提供的掛起方法發生掛起,其他掛起方法不能呼叫,因為在掛起方法會對續體進行攔截,可能導致後續程式碼的執行變得無法預測。

典型的例子就是sequence方法,它建立的協程就是受限協程,只能通過呼叫yield方法或者yieldAll方法才能發生掛起。由於受限協程中不能進行協程排程,因此其上下文是空的。

這裡launch方法的上下文有一個預設排程器,因此會建立一個ContinuationImpl物件。

到這裡,協程完成了建立。

二.協程的啟動

再次回到startCoroutineCancellable方法,當呼叫createCoroutineUnintercepted建立好協程後,會呼叫intercepted方法,程式碼如下:

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this

intercepted方法是Continuation介面的擴充套件方法,內部呼叫了ContinuationImpl類的intercepted方法。

1.ContinuationImpl類

internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
    public override val context: CoroutineContext
        get() = _context!!
    @Transient
    private var intercepted: Continuation<Any?>? = null
    // 如果沒有快取,則從上下文中獲取攔截器,呼叫interceptContinuation進行攔截,
    // 將攔截的續體儲存到全域性變數
    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
    protected override fun releaseIntercepted() {
        val intercepted = intercepted
        if (intercepted != null && intercepted !== this) {
            context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
        }
        this.intercepted = CompletedContinuation // just in case
    }
}

這裡的ContinuationInterceptor指的就是在newCoroutineContext方法中傳入的Dispatchers.Default排程器。CoroutineDispatcher類的interceptContinuation方法的程式碼如下:

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
     ...
    // 將續體包裹成DispatchedContinuation,並傳入當前排程器 
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
    ...
}

2.resumeCancellableWith方法

再次回到startCoroutineCancellable方法,當呼叫intercepted方法進行攔截後,會呼叫resumeCancellableWith方法,程式碼如下:

public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result)
    else -> resumeWith(result)
}

由於當前的Continuation物件的型別為DispatchedContinuation,因此呼叫DispatchedContinuation類的resumeCancellableWith方法,程式碼如下:

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
    ...
    @Suppress("NOTHING_TO_INLINE")
    inline fun resumeCancellableWith(result: Result<T>) {
        val state = result.toState()
        // 是否進行排程
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
            // 進行排程
            dispatcher.dispatch(context, this)
        } else {// Dispatcher.Unconfined排程器會走這裡
            executeUnconfined(state, MODE_CANCELLABLE) {
                // 協程未被取消
                if (!resumeCancelled()) {
                    // 恢復執行
                    resumeUndispatchedWith(result)
                }
            }
        }
    }
    // 恢復執行前判斷協程是否已經取消執行
    @Suppress("NOTHING_TO_INLINE")
    inline fun resumeCancelled(): Boolean {
        // 獲取當前的協程任務
        val job = context[Job]
        // 如果不為空且不活躍
        if (job != null && !job.isActive) {
            // 丟擲異常
            resumeWithException(job.getCancellationException())
            return true
        }
        return false
    }
    @Suppress("NOTHING_TO_INLINE")
    inline fun resumeUndispatchedWith(result: Result<T>) {
        // 該方法在指定的上下文中執行,在執行後同步協程上下文變化
        withCoroutineContext(context, countOrElement) {
            // 呼叫續體的resumeWith方法
            continuation.resumeWith(result)
        }
    }
    ...
}
// Dispatchers.Unconfined模式下的排程
private inline fun DispatchedContinuation<*>.executeUnconfined(
    contState: Any?, mode: Int, doYield: Boolean = false,
    block: () -> Unit
): Boolean {
    // 從ThreadLocal中獲取EventLoop
    val eventLoop = ThreadLocalEventLoop.eventLoop
    // doYield表示是否正在讓出執行
    // 如果正在讓出執行,並且執行佇列還是空的,說明不需要執行,返回false
    if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
    // 如果EventLoop當前還在被Unconfined排程器使用
    return if (eventLoop.isUnconfinedLoopActive) {
        _state = contState
        resumeMode = mode
        // 向佇列中新增當前的任務
        eventLoop.dispatchUnconfined(this)
        // 返回 true
        true
    } else {
        // 重新執行EventLoop
        runUnconfinedEventLoop(eventLoop, block = block)
        // 返回false
        false
    }
}

runUnconfinedEventLoop方法是一個擴充套件方法,用於啟動EventLoop,程式碼如下:

internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
    eventLoop: EventLoop,
    block: () -> Unit
) {
    // 參照計數+1
    eventLoop.incrementUseCount(unconfined = true)
    try {
        // 先執行當前的任務
        block()
        // 迴圈分發任務
        while (true) {
            // 全部執行完畢,則退出分發
            if (!eventLoop.processUnconfinedEvent()) break
        }
    } catch (e: Throwable) {
        handleFatalException(e, null)
    } finally {
        // 參照計數+1
        eventLoop.decrementUseCount(unconfined = true)
    }
}

Dispatchers.Default排程器與Dispatchers.Unconfined排程器的排程邏輯基本都相同,最終都是呼叫Contination物件的resumeWith方法,同時傳入Result物件作為引數。

這裡的Contination是createCoroutineUnintercepted方法建立的繼承ContinuationImpl的匿名內部類物件。Result是resumeCancellableWith方法傳入的Result.success(Unit)物件,因為首次啟動,所以傳入型別為Unit。

呼叫匿名內部類的resumeWith方法,實際呼叫的是父類別BaseContinuationImpl的resumeWith方法。

3.BaseContinuationImpl類

BaseContinuationImpl類的resumeWith方法的程式碼如下:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        var param = result
        // 迴圈
        while (true) {
            // 用於debug
            probeCoroutineResumed(current)
            // current環境下
            with(current) {
                // completion用於續體執行完的回撥,為空,則丟擲異常
                // 這裡的completion就是一開始建立的StandaloneCoroutine物件
                val completion = completion!! 
                // 獲取執行後的結果
                val outcome: Result<Any?> =
                    try {
                        // 核心執行
                        val outcome = invokeSuspend(param)
                        // 如果返回值為COROUTINE_SUSPENDED,說明協程掛起,退出迴圈
                        if (outcome === COROUTINE_SUSPENDED) return
                        // 返回結果成功
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        // 返回結果失敗
                        Result.failure(exception)
                    }
                // 釋放攔截的續體,狀態機終止
                releaseIntercepted() 
                // 這裡沒有直接呼叫resume,而是通過迴圈代替遞迴
                // 這也是resumeWith方法宣告為final的原因
                if (completion is BaseContinuationImpl) {
                    // 這種情況一般為多個suspend方法按順序執行
                    // 等待下一次迴圈
                    current = completion
                    param = outcome
                } else {
                    // 返回結果
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
    protected abstract fun invokeSuspend(result: Result<Any?>): Any?
    protected open fun releaseIntercepted() {
        // does nothing here, overridden in ContinuationImpl
    }
    public open fun create(completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Continuation) has not been overridden")
    }
    public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
    }
     ...
}

4.invokeSuspend方法

在上述程式碼中,resumeWith方法內部呼叫了invokeSuspend方法,這裡的invokeSuspend方法實際就是createCoroutineFromSuspendFunction方法中建立的匿名內部類的invokeSuspend方法。匿名內部類的程式碼如下:

object : ContinuationImpl(completion as Continuation<Any?>, context) {
    // 初始狀態
    private var label = 0
    override fun invokeSuspend(result: Result<Any?>): Any? =
            when (label) {
                0 -> {
                    label = 1
                    // 先去獲取一次結果,如果有異常,則直接丟擲,避免執行
                    // 比如在排程器中,如果發現協程已經取消,
                    // 則呼叫resumeWithException方法,在這裡直接被丟擲
                    result.getOrThrow()
                    // 把當前續體傳入,執行協程
                    // 可能發生掛起
                    block(this)
                }
                1 -> {
                    // 如果協程發生了掛起,那麼恢復掛起後會走到這裡
                    label = 2
                    // 獲取最終的執行結果
                    result.getOrThrow()
                }
                else -> error("This coroutine had already completed")
            }
}

三.協程的掛起與恢復

通過上述程式碼的分析,協程的掛起實際就是在協程返回結果時返回一個COROUTINE_SUSPENDED物件,在收到COROUTINE_SUSPENDED結果後直接返回,等待被再次呼叫resumeWith恢復。

COROUTINE_SUSPENDED物件定義在列舉類CoroutineSingletons中,程式碼如下:

internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }

該列舉類代表了協程的三個狀態,協程在建立後狀態為UNDECIDED,如果執行過程中發生掛起,則狀態變為COROUTINE_SUSPENDED,最後掛起恢復後狀態變為RESUMED。

而協程的恢復實際就是在掛起方法執行完成後,通過呼叫協程執行時傳入的續體的resumeWith方法,恢復後續程式碼的執行。

到此這篇關於Kotlin協程操作之建立啟動掛起恢復詳解的文章就介紹到這了,更多相關Kotlin協程操作內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com