<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
job啟動流程,我們先從一段最簡單使用協程的程式碼開始,進行程式碼跟跟蹤,順便引出幾個關鍵的概念,在後面章節裡面去單獨分析。程式碼如下:
private fun testParentChildJob() { val coroutineContext = Job() + CoroutineName("name1") + Dispatchers.IO + CoroutineExceptionHandler{ c,e -> println(e.message) } val myScope = CoroutineScope(coroutineContext) val job = myScope.launch { println("myScope.launch :") } }
首先建立一個有四種元素的上下文域myScope,由Job() + CoroutineName("name1") + Dispatchers.IO + CoroutineExceptionHandler{ c,e -> println(e.message) }
組成,上一章coroutineContext篇已經講過plus操作的過程了,不贅述。
接著用這個作用域myScope開啟一個協程,協程內列印println("myScope.launch :")
。
我自己從launch函數一步一步跟蹤後,得到了如下圖所示的流程:
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }
分為三步:
首先使用入參context: CoroutineContext = EmptyCoroutineContext,
建立一個新的上下文集合newCoroutineContext(context)
,newCoroutineContext函數操作:就是根據所在的scope域的上下文集合和入參進行組合操作,得到一個新的上下文集合,程式碼如下:
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext { val combined = coroutineContext + context val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) debug + Dispatchers.Default else debug }
可以看到各種+
操作,就是coroutineContext的各種plus操作,可以得到一個繼承自所在scope域的上下文集合(這個域由coroutineContext變數決定,這個變數屬於CoroutineScope成員),並且包含了入參的context元素,這樣上下文集合就具有繼承性,並且自己還可以對已有元素進行覆蓋。上一篇coroutineContext篇已經講過,就不贅述了。
由於我們使用的預設方式launch的,使用上面建立的newContext元素集合,就會建立一個StandaloneCoroutine(newContext, active = true)
協程物件。這個物件繼承關係比較複雜,繼承關係如下:
這個類裡面包含了很多成員變數,原始碼如下:
private open class StandaloneCoroutine( parentContext: CoroutineContext, active: Boolean ) : AbstractCoroutine<Unit>(parentContext, active) { //省略。。。 } public abstract class AbstractCoroutine<in T>( /** * The context of the parent coroutine. */ @JvmField protected val parentContext: CoroutineContext, active: Boolean = true ) : JobSupport(active), Job, Continuation<T>, CoroutineScope { /** * The context of this coroutine that includes this coroutine as a [Job]. */ @Suppress("LeakingThis") public final override val context: CoroutineContext = parentContext + this /** * The context of this scope which is the same as the [context] of this coroutine. */ public override val coroutineContext: CoroutineContext get() = context override val isActive: Boolean get() = super.isActive }
context成員變數是外部傳進來的newContext上下文集合 + this
得到的,那麼newContext的Job元素會被this替換掉;
coroutineContext成員變數是CoroutineScope介面的成員,覆寫為context物件; isActive標誌這個Job是否是存活狀態; 呼叫剛剛建立的coroutine協程的start方法,coroutine.start(start, coroutine, block)
,跟進去看看
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) { initParentJob() start(block, receiver, this) }
initParentJob()
方法主要是用於關聯父子Job的,這裡先不講,對啟動流程沒啥影響。
start(block, receiver, this)
是正真啟動協程的地方,CoroutineStart
的值是DEFAULT
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit = when (this) { DEFAULT -> block.startCoroutineCancellable(completion) ATOMIC -> block.startCoroutine(completion) UNDISPATCHED -> block.startCoroutineUndispatched(completion) LAZY -> Unit // will start lazily }
那麼呼叫的就是DEFAULT -> block.startCoroutineCancellable(completion)
這個分支,
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) { createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit)) } public actual fun <T> (suspend () -> T).createCoroutineUnintercepted( completion: Continuation<T> ): Continuation<Unit> { //省略。。。 createCoroutineFromSuspendFunction(probeCompletion) { (this as Function1<Continuation<T>, Any?>).invoke(it) } } private inline fun <T> createCoroutineFromSuspendFunction( completion: Continuation<T>, crossinline block: (Continuation<T>) -> Any? ): Continuation<Unit> { val context = completion.context // label == 0 when coroutine is not started yet (initially) or label == 1 when it was return if (context === EmptyCoroutineContext) //省略。。。 else object : ContinuationImpl(completion as Continuation<Any?>, context) { private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith block(this) // run the block, may return or suspend } //省略。。。 } } }
第一步:createCoroutineUnintercepted(completion)
就是以completion作為引數建立一個ContinuationImpl物件,這個completion就是上面建立的StandaloneCoroutine物件。這個新的ContinuationImpl物件是繼承自Continuation,那麼他就有fun resumeWith(result: Result<T>)
方法,該方法是用於恢復掛起點,val context: CoroutineContext
引數,這個引數就是Continuation的所關聯的上下文集合。
我們再自己看看這個createCoroutineFromSuspendFunction
這個方法,發現將我們launch{}的lambda引數進行包裝後(this as Function1<Continuation<T>, Any?>).invoke(it)
然後作為入參block,這個block作為ContinuationImpl物件覆寫的invokeSuspend函數的回撥函數。那麼可以從這個看出一個關係:
ContinuationImpl.invokeSuspend -> launch入參的lambda函數體
第二步:就是呼叫ContinuationImpl .intercepted()
,內部處理是獲取ContinuationImpl的上下文集合中的ContinuationInterceptor元素,然後將ContinuationImpl作為引數,包裝成DispatchedContinuation(this, continuation)
,其中this代表ContinuationInterceptor也就是dispatcher,continuation代表剛剛傳遞進來的ContinuationImpl。
第三步:resumeCancellableWith(Result.success(Unit))
,呼叫DispatchedContinuation的resumeCancellableWith函數,程式碼如下:
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) { is DispatchedContinuation -> resumeCancellableWith(result) else -> resumeWith(result) } //DispatchedContinuation extends DIspatchedTask inline fun resumeCancellableWith(result: Result<T>) { val state = result.toState() if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE dispatcher.dispatch(context, this) } else { 省略。。。 } } //DIspatchedTask public final override fun run() { val taskContext = this.taskContext var fatalException: Throwable? = null try { val delegate = delegate as DispatchedContinuation<T> val continuation = delegate.continuation withCoroutineContext(context, delegate.countOrElement) { //省略。。。 if (exception == null && job != null && !job.isActive) { //省略。。。 } else { if (exception != null) continuation.resumeWithException(exception) else continuation.resume(getSuccessfulResult(state)) } } } catch (e: Throwable) { //省略。。。 } }
由於DispatchedContinuation是繼承自DIspatchedTask的,所以DispatchedContinuation的run方法是DIspatchedTask已經實現的了,所以dispatcher.dispatch(context, this)
,dispatcher呼叫的是DIspatchedTask.run方法,(dispatcher是一個執行緒池和java執行緒池類似,但是有一點區別,後面章節再講),run方法中,首先獲取delegate,然後取出continuation變數,這個delegate其實是被DispatchedContinuation覆寫的,而且實現的Continuation介面被建構函式的continuation代理,這個入參continuation其實就是ContinuationImpl,上一步分析過了。
internal class DispatchedContinuation<in T>( @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation<T> ) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation { //省略。。。 override val delegate: Continuation<T> get() = this //省略。。。 } //Continuation public inline fun <T> Continuation<T>.resume(value: T): Unit = resumeWith(Result.success(value))
那麼其實就是呼叫的ContinuationImpl.resumeWith(Result.success(value))
方法,ContinuationImpl繼承自BaseContinuationImpl,繼續進去看看
public final override fun resumeWith(result: Result<Any?>) { // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume var param = result while (true) { //省略。。。 with(current) { try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } //省略。。。 releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { //省略。。。 } else { // top-level completion reached -- invoke and return completion.resumeWith(outcome) return } } } }
第一步,呼叫val outcome = invokeSuspend(param)
,上面已經分析了,invokeSuspend被ContinuationImpl覆寫了,內部回撥了launch的lambda表示式;
第二步,呼叫completion.resumeWith(outcome)
,這個completion上面分析了,是StandAloneCoroutine協程,呼叫了StandAloneCoroutine物件的resumeWith方法,這個方法裡面用於更新協程狀態,比如協程成功,失敗之類的。
綜上,通過上面的invokeSuspend函數呼叫,最終呼叫到了launch的lambda表示式,也就是我們業務程式碼,我們的業務程式碼是被封裝到了ContinuationImpl類中。
通過上面的分析,一共發現了三種不同型別的continuation,它們分別是:
DispatchedContinuation用於分發continuation到指定的執行緒池中; ContinuationImpl用於包裝launch的lambda程式碼塊作為業務程式碼代理類; StandAloneCoroutine協程管理類管理Job生命週期以及協程的狀態父子Job關係維護等等。
它們的呼叫鏈如下:
父子Job關聯操作是在上面launch流程中的,在呼叫start方法的時候進行關聯的:
initParentJob方法裡面,先呼叫parent.start方法,確保parent的Job已經啟動了,接著呼叫parent.attachChild(this)方法,用於關聯父子Job。 程式碼如下:
//AbstractCoroutine internal fun initParentJob() { //取出上下文集合中的Job元素,呼叫initParentJobInternal方法 initParentJobInternal(parentContext[Job]) } //AbstractCoroutine internal fun initParentJobInternal(parent: Job?) { //省略。。。 parent.start() // make sure the parent is started //省略。。。 val handle = parent.attachChild(this) parentHandle = handle //省略。。。 if (isCompleted) { handle.dispose() } }
首先取出parentContext[Job]的Job元素,這個parentContext就是launch的時候根據scope的上下文集合建立出來的上下文集合,取出的Job元素就是父Job,作為initParentJobInternal的引數,接著呼叫parent.attachChild(this):
//JobSupport public final override fun attachChild(child: ChildJob): ChildHandle { /* * Note: This function attaches a special ChildHandleNode node object. This node object * is handled in a special way on completion on the coroutine (we wait for all of them) and * is handled specially by invokeOnCompletion itself -- it adds this node to the list even * if the job is already cancelling. For cancelling state child is attached under state lock. * It's required to properly wait all children before completion and provide linearizable hierarchy view: * If child is attached when the job is already being cancelled, such child will receive immediate notification on * cancellation, but parent *will* wait for that child before completion and will handle its exception. */ return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle } //JobSupport internal class ChildHandleNode( parent: JobSupport, @JvmField val childJob: ChildJob ) : JobCancellingNode<JobSupport>(parent), ChildHandle { override fun invoke(cause: Throwable?) = childJob.parentCancelled(job) override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause) override fun toString(): String = "ChildHandle[$childJob]" }
首先建立了一個handler = ChildHandleNode(this, child).asHandler物件,這個物件ChildHandleNode作為引數傳遞給invokeOnCompletion,然後返回一個ChildHandle型別的物件,賦值給子Job的parentHandle val handle = parent.attachChild(this); parentHandle = handle
,parentHandle 這個是子Job持有的變數,ChildHandle介面擁有childCancelled方法,用於子Job通知父Job,子Job已經取消了,父Job需要根據子Job狀態繼續進行處理。
//JobSupport public final override fun invokeOnCompletion( onCancelling: Boolean, invokeImmediately: Boolean, handler: CompletionHandler ): DisposableHandle { var nodeCache: JobNode<*>? = null loopOnState { state -> when (state) { is Empty -> { // EMPTY_X state -- no completion handlers if (state.isActive) { // try move to SINGLE state val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it } if (_state.compareAndSet(state, node)) return node } //省略。。。 } is Incomplete -> { val list = state.list if (list == null) { // SINGLE/SINGLE+ promoteSingleToNodeList(state as JobNode<*>) } else { var rootCause: Throwable? = null var handle: DisposableHandle = NonDisposableHandle val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it } if (!addLastAtomic(state, list, node)) return@loopOnState // retry if (rootCause == null) return node //省略。。。 if (rootCause != null) { //省略。。。 } else { val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it } if (addLastAtomic(state, list, node)) return node } } } else -> { // is complete //省略。。。 } } } }
invokeOnCompletion方法就是,將傳遞進來的handler: CompletionHandler,分情況儲存起來,
當state狀態是Empty狀態,建立一個代理節點node ,之後存入到state中; 當state是Incomplete狀態,如果state.list結構是空的,那麼建立一個連結串列,將node 節點作為第一個節點存進去,當前state.list不為空,那麼將node節點插入到連結串列的末尾。 這樣經過上面這兩步: 子Job持有的parentHandle物件可以通知父Job自己已經取消了:
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
父Job持有的state物件儲存著包裝著子Job的ChildHandleNode物件,父Job通過遍歷呼叫列表中的node元素的invoke方法,即可取消所有的子Job:
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
會發現, 呼叫launch生成一個Job,這個Job就會initParentJob() ,進而子Job會持有父Job,父Job也會將子Job加入到state的資料結構中,進而形成了樹的結構,類似於下圖:
父子Job都可以互相通知對方自己已經取消,需要做出對應的處理。
launch啟動一個協程,會生成三個continuation,分別是
DispatchedContinuation用於分發continuation到指定的執行緒池中; ContinuationImpl用於包裝launch的lambda程式碼塊作為業務程式碼代理類; StandAloneCoroutine協程管理類管理Job生命週期以及協程的狀態父子Job關係維護等等。 呼叫鏈:DispatchedContinuation -> ContinuationImpl(在這裡呼叫launch的lambda業務程式碼塊) -> StandAloneCoroutine
launch啟動一個協程Job,這個Job所在域如果存在parentJob ,那麼parentJob和Job會形成樹結構上的父子節點,並且子Job繼承了父Job的CoroutineScope的上下文集合(根據引數會覆蓋一些重複Key的元素)。
到此這篇關於Kotlin Job啟動流程原始碼層深入分析的文章就介紹到這了,更多相關Kotlin Job啟動流程內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45