<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
下面以launch方法為例進行分析。
launch方法的程式碼如下:
// CoroutineScope的擴充套件方法 public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { // 根據當前上下文,計算得到新的上下文 val newContext = newCoroutineContext(context) // 根據啟動模式,建立不同的續體 val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) // 啟動協程 coroutine.start(start, coroutine, block) return coroutine }
newCoroutineContext用於計算新的上下文,程式碼如下:
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext { // coroutineContext為CoroutineScope中儲存的全域性變數 // 對上下文進行相加 val combined = coroutineContext + context // 用於debug val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined // 如果上下文中沒有排程器,則新增一個預設的排程器 return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) debug + Dispatchers.Default else debug }
在不指定協程啟動模式的情況下,協程將按照DEFAULT模式啟動,在上述程式碼中,會呼叫StandaloneCoroutine物件的start方法。StandaloneCoroutine的程式碼如下:
private open class StandaloneCoroutine( parentContext: CoroutineContext, active: Boolean ) : AbstractCoroutine<Unit>(parentContext, active) { override fun handleJobException(exception: Throwable): Boolean { handleCoroutineException(context, exception) return true } }
StandaloneCoroutine類中僅重寫了handleJobException方法,用於處理父協程不處理的異常。因此這裡呼叫的start方法實際是父類別AbstractCoroutine的方法,AbstractCoroutine類的start方法程式碼如下:
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) { // 該方法用於完成父協程與子協程的繫結關聯,同時確保父協程啟動 initParentJob() // 該方法的寫法等同於start.invoke(block, receiver, this) // 因此呼叫的CoroutineStart類的方法 start(block, receiver, this) }
AbstractCoroutine類的start方法內,呼叫了CoroutineStart類的invoke方法。
CoroutineStart是一個列舉類,用於根據不同的啟動模式去啟動協程,程式碼如下:
public enum class CoroutineStart { // 四種啟動模式 DEFAULT, LAZY, // 具有實驗性,慎用 @ExperimentalCoroutinesApi ATOMIC, // 具有實驗性,慎用 @ExperimentalCoroutinesApi UNDISPATCHED; // 根據不同的啟動策略,啟動協程,執行block @InternalCoroutinesApi public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit = when (this) { DEFAULT -> block.startCoroutineCancellable(completion) ATOMIC -> block.startCoroutine(completion) UNDISPATCHED -> block.startCoroutineUndispatched(completion) LAZY -> Unit // 該模式不主動啟動,等待使用者呼叫start方法 } // 根據不同的啟動策略,啟動協程,執行block @InternalCoroutinesApi public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit = when (this) { DEFAULT -> block.startCoroutineCancellable(receiver, completion) ATOMIC -> block.startCoroutine(receiver, completion) UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion) LAZY -> Unit } // 當前的啟動模式是否為懶啟動 @InternalCoroutinesApi public val isLazy: Boolean get() = this === LAZY }
CoroutineStart類中有兩個invoke方法,其中一個引數中有receiver,另一個沒有receiver。在Kotlin協程中,很多方法都過載了帶有receiver的方法和不帶有receiver的方法。
receiver用於為block執行提供一個環境。Kotlin中提供的啟動協程的方法都是通過帶receiver引數的start方法實現。通過receiver環境,可以更方便的實現一些操作,比如在launch啟動的協程中再次呼叫launch啟動新的協程。在沒有receiver的環境下執行block,則更像是在suspend方法中執行,如果需要啟動其他的協程,需要自己提供環境。
startCoroutineCancellable是一個擴充套件方法,用來建立一個可以取消的協程,程式碼如下:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) = runSafely(completion) { // createCoroutineUnintercepted:建立協程 // intercepted:攔截排程 // resumeCancellableWith:恢復執行 createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit)) } // 如果建立的過程發生異常,則通知續體恢復後續程式碼的執行 private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) { try { block() } catch (e: Throwable) { completion.resumeWith(Result.failure(e)) } }
createCoroutineUnintercepted方法用於建立一個新的、可掛起的、不受干擾的協程。
public expect fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted( receiver: R, completion: Continuation<T> ): Continuation<Unit>
在Kotlin中有很多被expect關鍵字標記的介面方法,需要找到對應平臺下被actual標記的實現方法。
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted( receiver: R, completion: Continuation<T> ): Continuation<Unit> { // 用於debug val probeCompletion = probeCoroutineCreated(completion) return if (this is BaseContinuationImpl) create(receiver, probeCompletion) else { createCoroutineFromSuspendFunction(probeCompletion) { (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it) } } }
createCoroutineUnintercepted方法建立的協程需要手動呼叫resumeWith方法才可以啟動,但重複的呼叫resumeWith方法可能會導致狀態機發生異常。同時,引數中傳入的completion可能會在任意的上下文中被呼叫。
正常情況下,我們編寫的lambda表示式——block,在編譯器編譯時,會自動生成一個類,並繼承SuspendLambda類,實現Continuation等介面。因為SuspendLambda繼承自ContinuationImpl,ContinuationImpl繼承自BaseContinuationImpl,所以才有了上述程式碼中的判斷邏輯。
如果當前的block物件的型別為BaseContinuationImpl,則呼叫create方法,這裡的create方法是編譯器生成的類裡的重寫方法,它的內部就是通過我們傳入的引數,建立並返回根據blcok生成的類的一個範例物件。
如果當前的block物件的型別不為BaseContinuationImpl,則需要通過createCoroutineFromSuspendFunction方法建立協程。這裡假設lambda表示式的型別不是BaseContinuationImpl。
該方法用於在createCoroutineUnintercepted方法中使用,當一個被suspend修飾的lambda表示式沒有繼承BaseContinuationImpl類時,則通過此方法建立協程。
有兩種情況會呼叫該方法建立協程:第一種情況是lambda表示式中呼叫了其他的掛起方法;第二種情況是掛起方法是通過Java實現的。
createCoroutineFromSuspendFunction方法的程式碼如下:
private inline fun <T> createCoroutineFromSuspendFunction( completion: Continuation<T>, crossinline block: (Continuation<T>) -> Any? ): Continuation<Unit> { val context = completion.context // 如果上下文為空 return if (context === EmptyCoroutineContext) // 建立一個受限協程 object : RestrictedContinuationImpl(completion as Continuation<Any?>) { private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 result.getOrThrow() block(this) } 1 -> { label = 2 result.getOrThrow() } else -> error("This coroutine had already completed") } } else // 不為空,則建立一個正常的協程 object : ContinuationImpl(completion as Continuation<Any?>, context) { private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 result.getOrThrow() block(this) } 1 -> { label = 2 result.getOrThrow() } else -> error("This coroutine had already completed") } } }
受限協程是指協程在執行過程中的,只能呼叫協程作用域中提供的掛起方法發生掛起,其他掛起方法不能呼叫,因為在掛起方法會對續體進行攔截,可能導致後續程式碼的執行變得無法預測。
典型的例子就是sequence方法,它建立的協程就是受限協程,只能通過呼叫yield方法或者yieldAll方法才能發生掛起。由於受限協程中不能進行協程排程,因此其上下文是空的。
這裡launch方法的上下文有一個預設排程器,因此會建立一個ContinuationImpl物件。
到這裡,協程完成了建立。
再次回到startCoroutineCancellable方法,當呼叫createCoroutineUnintercepted建立好協程後,會呼叫intercepted方法,程式碼如下:
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> = (this as? ContinuationImpl)?.intercepted() ?: this
intercepted方法是Continuation介面的擴充套件方法,內部呼叫了ContinuationImpl類的intercepted方法。
internal abstract class ContinuationImpl( completion: Continuation<Any?>?, private val _context: CoroutineContext? ) : BaseContinuationImpl(completion) { constructor(completion: Continuation<Any?>?) : this(completion, completion?.context) public override val context: CoroutineContext get() = _context!! @Transient private var intercepted: Continuation<Any?>? = null // 如果沒有快取,則從上下文中獲取攔截器,呼叫interceptContinuation進行攔截, // 將攔截的續體儲存到全域性變數 public fun intercepted(): Continuation<Any?> = intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) .also { intercepted = it } protected override fun releaseIntercepted() { val intercepted = intercepted if (intercepted != null && intercepted !== this) { context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted) } this.intercepted = CompletedContinuation // just in case } }
這裡的ContinuationInterceptor指的就是在newCoroutineContext方法中傳入的Dispatchers.Default排程器。CoroutineDispatcher類的interceptContinuation方法的程式碼如下:
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { ... // 將續體包裹成DispatchedContinuation,並傳入當前排程器 public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation) ... }
再次回到startCoroutineCancellable方法,當呼叫intercepted方法進行攔截後,會呼叫resumeCancellableWith方法,程式碼如下:
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) { is DispatchedContinuation -> resumeCancellableWith(result) else -> resumeWith(result) }
由於當前的Continuation物件的型別為DispatchedContinuation,因此呼叫DispatchedContinuation類的resumeCancellableWith方法,程式碼如下:
internal class DispatchedContinuation<in T>( @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation<T> ) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation { ... @Suppress("NOTHING_TO_INLINE") inline fun resumeCancellableWith(result: Result<T>) { val state = result.toState() // 是否進行排程 if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE // 進行排程 dispatcher.dispatch(context, this) } else {// Dispatcher.Unconfined排程器會走這裡 executeUnconfined(state, MODE_CANCELLABLE) { // 協程未被取消 if (!resumeCancelled()) { // 恢復執行 resumeUndispatchedWith(result) } } } } // 恢復執行前判斷協程是否已經取消執行 @Suppress("NOTHING_TO_INLINE") inline fun resumeCancelled(): Boolean { // 獲取當前的協程任務 val job = context[Job] // 如果不為空且不活躍 if (job != null && !job.isActive) { // 丟擲異常 resumeWithException(job.getCancellationException()) return true } return false } @Suppress("NOTHING_TO_INLINE") inline fun resumeUndispatchedWith(result: Result<T>) { // 該方法在指定的上下文中執行,在執行後同步協程上下文變化 withCoroutineContext(context, countOrElement) { // 呼叫續體的resumeWith方法 continuation.resumeWith(result) } } ... } // Dispatchers.Unconfined模式下的排程 private inline fun DispatchedContinuation<*>.executeUnconfined( contState: Any?, mode: Int, doYield: Boolean = false, block: () -> Unit ): Boolean { // 從ThreadLocal中獲取EventLoop val eventLoop = ThreadLocalEventLoop.eventLoop // doYield表示是否正在讓出執行 // 如果正在讓出執行,並且執行佇列還是空的,說明不需要執行,返回false if (doYield && eventLoop.isUnconfinedQueueEmpty) return false // 如果EventLoop當前還在被Unconfined排程器使用 return if (eventLoop.isUnconfinedLoopActive) { _state = contState resumeMode = mode // 向佇列中新增當前的任務 eventLoop.dispatchUnconfined(this) // 返回 true true } else { // 重新執行EventLoop runUnconfinedEventLoop(eventLoop, block = block) // 返回false false } }
runUnconfinedEventLoop方法是一個擴充套件方法,用於啟動EventLoop,程式碼如下:
internal inline fun DispatchedTask<*>.runUnconfinedEventLoop( eventLoop: EventLoop, block: () -> Unit ) { // 參照計數+1 eventLoop.incrementUseCount(unconfined = true) try { // 先執行當前的任務 block() // 迴圈分發任務 while (true) { // 全部執行完畢,則退出分發 if (!eventLoop.processUnconfinedEvent()) break } } catch (e: Throwable) { handleFatalException(e, null) } finally { // 參照計數+1 eventLoop.decrementUseCount(unconfined = true) } }
Dispatchers.Default排程器與Dispatchers.Unconfined排程器的排程邏輯基本都相同,最終都是呼叫Contination物件的resumeWith方法,同時傳入Result物件作為引數。
這裡的Contination是createCoroutineUnintercepted方法建立的繼承ContinuationImpl的匿名內部類物件。Result是resumeCancellableWith方法傳入的Result.success(Unit)物件,因為首次啟動,所以傳入型別為Unit。
呼叫匿名內部類的resumeWith方法,實際呼叫的是父類別BaseContinuationImpl的resumeWith方法。
BaseContinuationImpl類的resumeWith方法的程式碼如下:
internal abstract class BaseContinuationImpl( public val completion: Continuation<Any?>? ) : Continuation<Any?>, CoroutineStackFrame, Serializable { public final override fun resumeWith(result: Result<Any?>) { var current = this var param = result // 迴圈 while (true) { // 用於debug probeCoroutineResumed(current) // current環境下 with(current) { // completion用於續體執行完的回撥,為空,則丟擲異常 // 這裡的completion就是一開始建立的StandaloneCoroutine物件 val completion = completion!! // 獲取執行後的結果 val outcome: Result<Any?> = try { // 核心執行 val outcome = invokeSuspend(param) // 如果返回值為COROUTINE_SUSPENDED,說明協程掛起,退出迴圈 if (outcome === COROUTINE_SUSPENDED) return // 返回結果成功 Result.success(outcome) } catch (exception: Throwable) { // 返回結果失敗 Result.failure(exception) } // 釋放攔截的續體,狀態機終止 releaseIntercepted() // 這裡沒有直接呼叫resume,而是通過迴圈代替遞迴 // 這也是resumeWith方法宣告為final的原因 if (completion is BaseContinuationImpl) { // 這種情況一般為多個suspend方法按順序執行 // 等待下一次迴圈 current = completion param = outcome } else { // 返回結果 completion.resumeWith(outcome) return } } } } protected abstract fun invokeSuspend(result: Result<Any?>): Any? protected open fun releaseIntercepted() { // does nothing here, overridden in ContinuationImpl } public open fun create(completion: Continuation<*>): Continuation<Unit> { throw UnsupportedOperationException("create(Continuation) has not been overridden") } public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> { throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden") } ... }
在上述程式碼中,resumeWith方法內部呼叫了invokeSuspend方法,這裡的invokeSuspend方法實際就是createCoroutineFromSuspendFunction方法中建立的匿名內部類的invokeSuspend方法。匿名內部類的程式碼如下:
object : ContinuationImpl(completion as Continuation<Any?>, context) { // 初始狀態 private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 // 先去獲取一次結果,如果有異常,則直接丟擲,避免執行 // 比如在排程器中,如果發現協程已經取消, // 則呼叫resumeWithException方法,在這裡直接被丟擲 result.getOrThrow() // 把當前續體傳入,執行協程 // 可能發生掛起 block(this) } 1 -> { // 如果協程發生了掛起,那麼恢復掛起後會走到這裡 label = 2 // 獲取最終的執行結果 result.getOrThrow() } else -> error("This coroutine had already completed") } }
通過上述程式碼的分析,協程的掛起實際就是在協程返回結果時返回一個COROUTINE_SUSPENDED物件,在收到COROUTINE_SUSPENDED結果後直接返回,等待被再次呼叫resumeWith恢復。
COROUTINE_SUSPENDED物件定義在列舉類CoroutineSingletons中,程式碼如下:
internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }
該列舉類代表了協程的三個狀態,協程在建立後狀態為UNDECIDED,如果執行過程中發生掛起,則狀態變為COROUTINE_SUSPENDED,最後掛起恢復後狀態變為RESUMED。
而協程的恢復實際就是在掛起方法執行完成後,通過呼叫協程執行時傳入的續體的resumeWith方法,恢復後續程式碼的執行。
到此這篇關於Kotlin協程操作之建立啟動掛起恢復詳解的文章就介紹到這了,更多相關Kotlin協程操作內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45