首頁 > 軟體

Kotlin 協程的取消機制詳細解讀

2022-10-26 14:01:46

引言

在 Java 語言中提供了執行緒中斷的能力,但並不是所有的執行緒都可以中斷的,因為 interrupt 方法並不是真正的終止執行緒,而是將一個標誌位標記為中斷狀態,當執行到下一次中斷標誌位檢查時,才能觸發終止執行緒。

但無論如何,終止執行緒是一個糟糕的方案,因為線上程的銷燬和重建,是要消耗系統資源的,造成了不必要的開銷。Kotlin 協程提供了更優雅的取消機制,這也是協程比較核心的功能之一。

協程的狀態

在瞭解取消機制之前我們需要知道一些關於 Job 狀態的內容:

StateisActive(是否活躍)isCompleted(是否完成)isCancelled(是否取消)
New (可選初始狀態)falsefalsefalse
Active (預設初始狀態)truefalsefalse
Completing (短暫態)truefalsefalse
Cancelling (短暫態)falsefalsetrue
Cancelled (完成態)falsetruetrue
Completed (完成態)falsetruefalse

可以看出,在完成和取消的過程中,會經過一個短暫的進行中的狀態,然後才變成已完成/已取消。

在這裡只關注一下取消相關的狀態:

  • Cancelling
  • 丟擲異常的 Job 會導致其進入 Cancelling 狀態,也可以使用 cancel 方法來隨時取消 Job 使其立即轉換為 Cancelling 狀態。
  • Cancelled
  • 當它遞迴取消子項,並等待所有的子項都取消後,該 Job 會進入 Cancelled 狀態。

取消協程的用法

協程在程式碼中抽象的型別是 Job , 下面是一個官方的程式碼範例,用來展示如何取消協程的執行:

suspend fun main(): Unit = coroutineScope {
    val job = launch {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    job.join() // waits for job's completion 
    println("main: Now I can quit.")
}

它的輸出是:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

一旦 mian 方法中呼叫了 job.cancel() ,我們就看不到其他協程的任何輸出,因為它已被取消了。

協程取消的有效性

協程程式碼必須通過與掛起函數的配合才能被取消。kotlinx.coroutines 中所有掛起函數(帶有 suspend 關鍵字函數)都是可以被取消的。suspend 函數會檢查協程是否需要取消並在取消時丟擲 CancellationException

但是,如果協程在執行過程中沒有掛起點,則不能取消協程,如下例所示:

suspend fun main(): Unit = coroutineScope {
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (i < 5) { // computation loop, just wastes CPU
            // print a message twice a second
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
}

在這個 job 中,並沒有執行任何 suspend 函數,所以在執行過程中並沒有對協程是否需要取消進行檢查,自然也就無法觸發取消。

同樣的問題也可以在通過 捕獲 CancellationException 並且不丟擲的情況下 觀察到:

suspend fun main(): Unit = coroutineScope {
    val job = launch(Dispatchers.Default) {
        repeat(5) { i ->
            try {
                // print a message twice a second
                println("job: I'm sleeping $i ...")
                delay(500)
            } catch (e: Exception) {
                // log the exception
                println(e)
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
}

列印結果是:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@614acfe9
job: I'm sleeping 3 ...
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@614acfe9
job: I'm sleeping 4 ...
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@614acfe9
main: Now I can quit.

從列印結果來看,迴圈 5 次全部執行了,好像取消並沒有起到作用。但實際上不是這樣的,為了便於觀察加上時間戳:

1665217217682: job: I'm sleeping 0 ...
1665217218196: job: I'm sleeping 1 ...
1665217218697: job: I'm sleeping 2 ...
1665217218996: main: I'm tired of waiting!
1665217219000: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@3a1efc0d
1665217219000: job: I'm sleeping 3 ...
1665217219000: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@3a1efc0d
1665217219000: job: I'm sleeping 4 ...
1665217219000: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@3a1efc0d
1665217219001: main: Now I can quit.

加上時間可以看出,丟擲第一次異常後的兩次迴圈和異常捕獲都是在同一瞬間完成的。這說明了捕獲到異常後,仍然會執行程式碼,但是所有的 delay 方法都沒有生效,即該 Job 的所有子 Job 都失效了。但該 Job 仍在繼續迴圈列印。原因是,父 Job 會等所有子 Job 處理結束後才能完成取消。

而如果我們不使用 try-catch 呢?

suspend fun main(): Unit = coroutineScope {
    val job = launch(Dispatchers.Default) {
        repeat(5) { i ->
            // print a message twice a second
            println("job: I'm sleeping $i ...")
            delay(500)
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
}

列印結果:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

很順利的取消了,這是因為協程丟擲 Exception 直接終止了。

注意協程丟擲 CancellationException 並不會導致 App Crash 。

使用 try-catch 來捕獲 CancellationException 時需要注意,在掛起函數前的程式碼邏輯仍會多次執行,從而導致這部分程式碼彷彿沒有被取消一樣。

如何寫出可以取消的程式碼

有兩種方法可以使程式碼是可取消的。第一種方法是定期呼叫掛起函數,檢查是否取消,就是上面的例子中的方法;另一個是顯式檢查取消狀態:

suspend fun main(): Unit = coroutineScope {
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (isActive) { // cancellable computation loop
            // print a message twice a second
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
}

將上面的迴圈 5 次通過使用 while (isActive) 進行替換,實現顯示檢查取消的程式碼。isActive 是通過 CoroutineScope 物件在協程內部可用的擴充套件屬性。

在 finally 中釋放資源

在前面的例子中我們使用 try-catch 捕獲 CancellationException 發現會產生父協程等待所有子協程完成後才能完成,所以建議不用 try-catch 而是 try{…} finally{…} ,讓父協程在被取消時正常執行終結操作:

val job = launch {
    try {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    } finally {
        println("job: I'm running finally")
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")

join 和 cancelAndJoin 都要等待所有終結操作完成,所以上面的例子產生了以下輸出:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm running finally
main: Now I can quit.

使用不可取消的 block

如果在在上面的範例的 finally 程式碼塊中使用 suspend 函數,會導致丟擲 CancellationException 。

因為執行這些程式碼的協程已經被取消了。通常情況下這不會有任何問題,然而,在極少數情況下,如果你需要在 finally 中使用一個掛起函數,你可以通過使用 withContext(NonCancellable) { ... }

val job = launch {
    try {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    } finally {
        withContext(NonCancellable) {
            println("job: I'm running finally")
            delay(1000L)
            println("job: And I've just delayed for 1 sec because I'm non-cancellable")
        }
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")

CancellationException

在上面的內容中,我們知道協程的取消是通過丟擲 CancellationException 來進行的,神奇的是丟擲 Exception 並沒有導致應用程式 Crash 。

CancellationException 的真實實現是 j.u.c. 中的 CancellationException :

public actual typealias CancellationException = java.util.concurrent.CancellationException

如果協程的 Job 被取消,則由可取消的掛起函數丟擲 CancellationException 。它表示協程的正常取消。在預設的 CoroutineExceptionHandler 下,它不會列印到控制檯/紀錄檔。

上面參照了這個類的註釋,看來處理丟擲異常的邏輯在 CoroutineExceptionHandler 中:

public interface CoroutineExceptionHandler : CoroutineContext.Element {
    /**
     * Key for [CoroutineExceptionHandler] instance in the coroutine context.
     */
    public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>
    /**
     * Handles uncaught [exception] in the given [context]. It is invoked
     * if coroutine has an uncaught exception.
     */
    public fun handleException(context: CoroutineContext, exception: Throwable)
}

通常,未捕獲的 Exception 只能由使用協程構建器的根協程產生。所有子協程都將異常的處理委託給他們的父協程,父協程也委託給它自身的父協程,直到委託給根協程處理。所以在子協程中的 CoroutineExceptionHandler 永遠不會被使用。

使用 SupervisorJob 執行的協程不會將異常傳遞給它們的父協程,SupervisorJob 被視為根協程。

使用 async 建立的協程總是捕獲它的所有異常通過結果 Deferred 物件回撥出去,因此它不能導致未捕獲的異常。

CoroutineExceptionHandler 用於記錄異常、顯示某種型別的錯誤訊息、終止和/或重新啟動應用程式。

如果需要在程式碼的特定部分處理異常,建議在協程中的相應程式碼周圍使用 try-catch。通過這種方式,您可以阻止異常協程的完成(異常現在被捕獲),重試操作,和/或採取其他任意操作。 這也就是我們前面論證的在協程中使用 try-catch 導致的取消失效。

預設情況下,如果協程沒有設定用於處理異常的 Handler ,未捕獲的異常將按以下方式處理:

如果 exception 是 CancellationException ,那麼它將被忽略(因為這是取消正在執行的協程的假定機制)。

其他情況:

  • 如果上下文中有一個 Job,那麼呼叫 job.cancel()
  • 否則,通過 ServiceLoader 找到的 CoroutineExceptionHandler 的所有範例並呼叫當前執行緒的 Thread.uncaughtExceptionHandler 來處理異常。

超時取消

取消協程執行的最合適的應用場景是它的執行時間超過了規定的最大時間時自動取消任務。在 Kotlin 協程庫中提供了 withTimeout 方法來實現這個功能:

withTimeout(1300L) {
    repeat(1000) { i ->
        println("I'm sleeping $i ...")
        delay(500L)
    }
}

執行結果:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms

TimeoutCancellationException 是 CancellationException 的子類,TimeoutCancellationException 通過 withTimeout 函數丟擲。

在本例中,我們在main函數中使用了withTimeout ,執行過程中會導致 Crash 。

有兩種解決辦法,就是使用 try{…} catch (e: TimeoutCancellationException){…} 程式碼塊;另一種辦法是使用在超時的情況下不是丟擲異常而是返回 null 的 withTimeoutOrNull 函數:

val result = withTimeoutOrNull(1300L) {
    repeat(1000) { i ->
        println("I'm sleeping $i ...")
        delay(500L)
    }
    "Done" // will get cancelled before it produces this result
}
println("Result is $result")

列印結果:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null

非同步的超時和資源

withTimeout 中的超時事件相對於在其塊中執行的程式碼是非同步的,並且可能在任何時間發生,甚至在從超時塊內部返回之前。如果你在塊內部開啟或獲取一些資源,需要關閉或釋放到塊外部。

例如,在這裡,我們用 Resource 類模擬一個可關閉資源,它只是通過對獲得的計數器遞增,並對該計數器從其關閉函數遞減來跟蹤建立次數。讓我們用小超時執行大量的協程,嘗試在一段延遲後從withTimeout塊內部獲取這個資源,並從外部釋放它。

var acquired = 0
class Resource {
    init { acquired++ } // Acquire the resource
    fun close() { acquired-- } // Release the resource
}
fun main() {
    runBlocking {
        repeat(100_000) { // Launch 100K coroutines
            launch { 
                val resource = withTimeout(60) { // Timeout of 60 ms
                    delay(50) // Delay for 50 ms
                    Resource() // Acquire a resource and return it from withTimeout block     
                }
                resource.close() // Release the resource
            }
        }
    }
    // Outside of runBlocking all coroutines have completed
    println(acquired) // Print the number of resources still acquired
}

如果執行上面的程式碼,您將看到它並不總是列印 0,儘管它可能取決於您的機器的時間,在本例中您可能需要調整超時以實際看到非零值。

要解決這個問題,可以在變數中儲存對資源的參照,而不是從withTimeout塊返回它。

fun main() {
    runBlocking {
        repeat(100_000) { // Launch 100K coroutines
            launch {
                var resource: Resource? = null // Not acquired yet
                try {
                    withTimeout(60) { // Timeout of 60 ms
                        delay(50) // Delay for 50 ms
                        resource = Resource() // Store a resource to the variable if acquired
                    }
                    // We can do something else with the resource here
                } finally {
                    resource?.close() // Release the resource if it was acquired
                }
            }
        }
    }
// Outside of runBlocking all coroutines have completed
    println(acquired) // Print the number of resources still acquired
}

這樣這個例子總是輸出0。資源不會洩漏。

取消檢查的底層原理

在探索協程取消的有效性時,我們知道協程程式碼必須通過與掛起函數的配合才能被取消。

kotlinx.coroutines 中所有掛起函數(帶有 suspend 關鍵字函數)都是可以被取消的。suspend 函數會檢查協程是否需要取消並在取消時丟擲 CancellationException 。

關於協程的取消機制,很明顯和 suspend 關鍵字有關。為了測試 suspend 關鍵字的作用,實現下面的程式碼:

class Solution {
    suspend fun func(): String {
        return "測試 suspend 關鍵字"
    }
}

作為對照組,另一個是不加 suspend 關鍵字的 func 方法:

class Solution {
    fun func(): String {
        return "測試 suspend 關鍵字"
    }
}

兩者反編譯成 Java :

// 普通的方法
public final class Solution {
    public static final int $stable = LiveLiterals$SolutionKt.INSTANCE.Int$class-Solution();
    @NotNull
    public final String func() {
        return LiveLiterals$SolutionKt.INSTANCE.String$fun-func$class-Solution();
    }
}
// 帶有 suspend 關鍵字的方法
public final class Solution {
    public static final int $stable = LiveLiterals$SolutionKt.INSTANCE.Int$class-Solution();
    @Nullable
    public final Object func(@NotNull Continuation<? super String> $completion) {
        return LiveLiterals$SolutionKt.INSTANCE.String$fun-func$class-Solution();
    }
}

suspend 關鍵字修飾的方法反編譯後預設生成了帶有 Continuation 引數的方法。說明 suspend 關鍵字的玄機在 Continuation 類中。

Continuation 是 Kotlin 協程的核心思想 Continuation-Passing Style 的實現。原理參考簡述協程的底層實現原理

通過在普通函數的引數中增加一個 Continuation 引數,這個 continuation 的性質類似於一個 lambda 物件,將方法的返回值型別傳遞到這個 lambda 程式碼塊中。

什麼意思呢?就是本來這個方法的返回型別直接 return 出來的:

val a: String = func()
print(a)

而經過 suspend 修飾,程式碼變成了這個樣子:

func { a ->
    print(a)
}

Kotlin 協程就是通過這樣的包裝,將比如 launch 方法,實際上是 launch 最後一個引數接收的是 lambda 引數。也就是把外部邏輯傳遞給函數內部執行。

回過頭來再來理解 suspend 關鍵字,我們知道帶有 suspend 關鍵字的方法會對協程的取消進行檢查,從而取消協程的執行。從這個能力上來看,我理解他應該會自動生成類似下面的邏輯程式碼:

生成的函數 {
    if(!當前協程.isActive) {
        throw CancellationException()
    }
    // ... 這裡是函數真實邏輯
}

suspend 修飾的函數,會自動生成一個掛起點,來檢查協程是否應該被掛起。

顯然 Continuation 中宣告的函數也證實了掛起的功能:

public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext
    /**
     * 恢復相應協程的執行,將成功或失敗的結果作為最後一個掛起點的返回值傳遞。
     */
    public fun resumeWith(result: Result<T>)
}

協程本質上是產生了一個 switch 語句,每個掛起點之間的邏輯都是一個 case 分支的邏輯。參考 協程是如何實現的 中的例子:

        Function1 lambda = (Function1)(new Function1((Continuation)null) {
            int label;
            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
                byte text;
                @BlockTag1: {
                    Object result;
                    @BlockTag2: {
                        result = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                        switch(this.label) {
                            case 0:
                                ResultKt.throwOnFailure($result);
                                this.label = 1;
                                if (SuspendTestKt.dummy(this) == result) {
                                    return result;
                                }
                                break;
                            case 1:
                                ResultKt.throwOnFailure($result);
                                break;
                            case 2:
                                ResultKt.throwOnFailure($result);
                                break @BlockTag2;
                            case 3:
                                ResultKt.throwOnFailure($result);
                                break @BlockTag1;
                            default:
                                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                        }
                        text = 1;
                        System.out.println(text);
                        this.label = 2;
                        if (SuspendTestKt.dummy(this) == result) {
                            return result;
                        }
                    }
                    text = 2;
                    System.out.println(text);
                    this.label = 3;
                    if (SuspendTestKt.dummy(this) == result) {
                        return result;
                    }
                }
                text = 3;
                System.out.println(text);
                return Unit.INSTANCE;
            }
            @NotNull
            public final Continuation create(@NotNull Continuation completion) {
                Intrinsics.checkNotNullParameter(completion, "completion");
                Function1 funcation = new <anonymous constructor>(completion);
                return funcation;
            }
            public final Object invoke(Object object) {
                return ((<undefinedtype>)this.create((Continuation)object)).invokeSuspend(Unit.INSTANCE);
            }
        });

可以看出,在每個分支都會執行一次 ResultKt.throwOnFailure($result); ,從名字上就知道,這就是檢查是否需要取消並丟擲異常的程式碼所在:

@PublishedApi
@SinceKotlin("1.3")
internal fun Result<*>.throwOnFailure() {
    if (value is Result.Failure) throw value.exception
}

這裡的 Result 類是一個包裝類,它將成功的結果封裝為型別 T 的值,或將失敗的結果封裝為帶有任意Throwable異常的值。

        @Suppress("INAPPLICABLE_JVM_NAME")
        @InlineOnly
        @JvmName("success")
        public inline fun <T> success(value: T): Result<T> =
            Result(value)
        /**
         * Returns an instance that encapsulates the given [Throwable] [exception] as failure.
         */
        @Suppress("INAPPLICABLE_JVM_NAME")
        @InlineOnly
        @JvmName("failure")
        public inline fun <T> failure(exception: Throwable): Result<T> =
            Result(createFailure(exception))

成功和失敗的方法型別是不一樣的,證實了這一點,success 方法接收型別為 T 的引數;failure 接收 Throwable 型別的引數。

到這裡 suspend 方法掛起的原理就明瞭了:在協程的狀態機中,通過掛起點會分割出不同的狀態,對每一個狀態,會先進行掛起結果的檢查。 這會導致以下結果:

  • 協程的取消機制是通過掛起函數的掛起點檢查來進行取消檢查的。證實了為什麼如果沒有 suspend 函數(本質是掛起點),協程的取消就不會生效。
  • 協程的取消機制是需要函數合作的,就是通過 suspend 函數來增加取消檢查的時機。
  • 父協程會執行完所有的子協程(掛起函數),因為程式碼的本質是一個迴圈執行 switch 語句,當一個子協程(或掛起函數)執行結束,會繼續執行到下一個分支。但是最後一個掛起點後續的程式碼並不會被執行,因為最後一個掛起點檢查到失敗,不會繼續跳到最後的 label 分支。

以上就是Kotlin 協程的取消機制詳細解讀的詳細內容,更多關於Kotlin 協程取消機制的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com