<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
上一遍文章《Python中Async語法協程的實現》介紹了Python是如何以生成器來實現協程的以及Python Asyncio通過Future和Task的封裝來實現協程的排程,而在Python Asyncio之中Coroutines, Tasks和Future都屬於可等待物件,在使用的Asyncio的過程中,經常涉及到三者的轉換和排程,開發者容易在概念和作用上犯迷糊,本文主要闡述的是三者之間的關係以及他們的作用。
協程是執行緒中的一種特例,協程的入口和切換都是靠事件迴圈來排程的,在新版的Python
中協程的入口是Asyncio.run
,當程式執行到Asyncio.run
後,可以簡單的理解為程式由執行緒模式切換為協程模式(只是方便理解,對於計算機而言,並沒有這樣區分),
以下是一個最小的協程例子程式碼:
import asyncio async def main(): await asyncio.sleep(0) asyncio.run(main())
在這段程式碼中,main
函數和asyncio.sleep
都屬於Coroutine,main
是通過asyncio.run
進行呼叫的,接下來程式也進入一個協程模式,asyncio.run
的核心呼叫是Runner.run
,它的程式碼如下:
class Runner: ... def run(self, coro, *, context=None): """Run a coroutine inside the embedded event loop.""" # 省略程式碼 ... # 把coroutine轉為task task = self._loop.create_task(coro, context=context) # 省略程式碼 ... try: # 如果傳入的是Future或者coroutine,也會專為task return self._loop.run_until_complete(task) except exceptions.CancelledError: # 省略程式碼 ...
這段程式碼中刪去了部分其它功能和初始化的程式碼,可以看到這段函數的主要功能是通過loop.create_task方法把一個Coroutine物件轉為一個Task物件,然後通過loop.run_until_complete等待這個Task執行結束。
可以看到,Asycnio
並不會直接去排程Coroutine,而是把它轉為Task再進行排程,這是因為在Asyncio
中事件迴圈的最小排程物件就是Task。不過在Asyncio
中並不是所有的Coroutine的呼叫都會先被轉為Task物件再等待,比如範例程式碼中的asyncio.sleep
,由於它是在main
函數中直接await的,所以它不會被進行轉換,而是直接等待,通過呼叫工具分析展示的圖如下:
在這個圖示中,從main
函數到asyncio.sleep
函數中沒有明顯的loop.create_task
等把Coroutine轉為Task呼叫,這裡之所以不用進行轉換的原因不是做了一些特殊優化,而是本因如此, 這個await asyncio.sleep
函數實際上還是會被main
這個Coroutine轉換成的Task
繼續排程到。
在瞭解Task
的排程原理之前,還是先回到最初的呼叫範例,看看直接用Task呼叫和直接用Coroutine呼叫的區別是什麼。
如下程式碼,我們顯示的執行一個Coroutine轉為Task的操作再等待,那麼程式碼會變成下面這樣:
import asyncio async def main(): await asyncio.create_task(asyncio.sleep(0)) asyncio.run(main())
這樣的程式碼看起來跟最初的呼叫範例很像,沒啥區別,但是如果進行一些改變,比如增加一些休眠時間和Coroutine的呼叫,就能看出Task物件的作用了,現在編寫兩份檔案,
他們的程式碼如下:
# demo_coro.py import asyncio import time async def main(): await asyncio.sleep(1) await asyncio.sleep(2) s_t = time.time() asyncio.run(main()) print(time.time() - s_t) # // Output: 3.0028765201568604 # demo_task.py import asyncio import time async def main(): task_1 = asyncio.create_task(asyncio.sleep(1)) task_2 = asyncio.create_task(asyncio.sleep(2)) await task_1 await task_2 s_t = time.time() asyncio.run(main()) print(time.time() - s_t) # // Output: 2.0027475357055664
其中demo_coro.py
進行了兩次await
呼叫,程式的執行總時長為3秒,而demo_task.py
則是先把兩個Coroutine物件轉為Task物件,然後再進行兩次await
呼叫,程式的執行總時長為2秒。可以發現,demo_task.py
的執行時長近似於其中執行最久的Task物件時長,而demo_coro.py
的執行時長則是近似於兩個Coroutine物件的總執行時長。
之所以會是這樣的結果,是因為直接await
Coroutine物件時,這段程式會一直等待,直到Coroutine物件執行完畢再繼續往下走,而Task物件的不同之處就是在建立的那一刻,就已經把自己註冊到事件迴圈之中等待被安排執行了,然後返回一個task物件供開發者等待,由於asyncio.sleep
是一個純IO型別的呼叫,所以在這個程式中,兩個asyncio.sleep
Coroutine被轉為Task從而實現了並行呼叫。
上述的程式碼之所以通過Task能實現並行呼叫,是因為Task中出現了一些與事件迴圈互動的函數,正是這些函數架起了Coroutine並行呼叫的可能, 不過Task是Future的一個子物件,所以在瞭解Task之前,需要先了解Future。
與Coroutine只有讓步和接收結果不同的是Future除了讓步和接收結果功能外,它還是一個只會被動進行事件呼叫且帶有狀態的容器,它在初始化時就是Pending
狀態,這時可以被取消,被設定結果和設定異常。而在被設定對應的操作後,Future會被轉化到一個不可逆的對應狀態,並通過loop.call_sonn
來呼叫所有註冊到本身上的回撥函數,同時它帶有__iter__
和__await__
方法使其可以被await
和yield from
呼叫,它的主要程式碼如下:
class Future: ... def set_result(self, result): """設定結果,並安排下一個呼叫""" if self._state != _PENDING: raise exceptions.InvalidStateError(f'{self._state}: {self!r}') self._result = result self._state = _FINISHED self.__schedule_callbacks() def set_exception(self, exception): """設定異常,並安排下一個呼叫""" if self._state != _PENDING: raise exceptions.InvalidStateError(f'{self._state}: {self!r}') if isinstance(exception, type): exception = exception() if type(exception) is StopIteration: raise TypeError("StopIteration interacts badly with generators " "and cannot be raised into a Future") self._exception = exception self._state = _FINISHED self.__schedule_callbacks() self.__log_traceback = True def __await__(self): """設定為blocking,並接受await或者yield from呼叫""" if not self.done(): self._asyncio_future_blocking = True yield self # This tells Task to wait for completion. if not self.done(): raise RuntimeError("await wasn't used with future") return self.result() # May raise too. __iter__ = __await__ # make compatible with 'yield from'.
單看這段程式碼是很難理解為什麼下面這個future被呼叫set_result
後就能繼續往下走:
async def demo(future: asyncio.Future): await future print("aha")
這是因為Future跟Coroutine一樣,沒有主動排程的能力,只能通過Task和事件迴圈聯手被排程。
Task是Future的子類,除了繼承了Future的所有方法,它還多了兩個重要的方法__step
和__wakeup
,通過這兩個方法賦予了Task排程能力,這是Coroutine和Future沒有的,Task的涉及到排程的主要程式碼如下(說明見註釋):
class Task(futures._PyFuture): # Inherit Python Task implementation # from a Python Future implementation. _log_destroy_pending = True def __init__(self, coro, *, loop=None, name=None, context=None): super().__init__(loop=loop) # 省略部分初始化程式碼 ... # 託管的coroutine self._coro = coro if context is None: self._context = contextvars.copy_context() else: self._context = context # 通過loop.call_sonn,在Task初始化後馬上就通知事件迴圈在下次有空的時候執行自己的__step函數 self._loop.call_soon(self.__step, context=self._context) def __step(self, exc=None): coro = self._coro # 方便asyncio自省 _enter_task(self._loop, self) # Call either coro.throw(exc) or coro.send(None). try: if exc is None: # 通過send預激託管的coroutine # 這時候只會得到coroutine yield回來的資料或者收到一個StopIteration的異常 # 對於Future或者Task返回的是Self result = coro.send(None) else: # 傳送異常給coroutine result = coro.throw(exc) except StopIteration as exc: # StopIteration代表Coroutine執行完畢 if self._must_cancel: # coroutine在停止之前被執行了取消操作,則需要顯示的執行取消操作 self._must_cancel = False super().cancel(msg=self._cancel_message) else: # 把執行完畢的值傳送到結果值中 super().set_result(exc.value) # 省略其它異常封裝 ... else: # 如果沒有異常丟擲 blocking = getattr(result, '_asyncio_future_blocking', None) if blocking is not None: # 通過Future程式碼可以判斷,如果帶有_asyncio_future_blocking屬性,則代表當前result是Future或者是Task # 意味著這個Task裡面裹著另外一個的Future或者Task # 省略Future判斷 ... if blocking: # 代表這這個Future或者Task處於卡住的狀態, # 此時的Task放棄了自己對事件迴圈的控制權,等待這個卡住的Future或者Task執行完成時喚醒一下自己 result._asyncio_future_blocking = False result.add_done_callback(self.__wakeup, context=self._context) self._fut_waiter = result if self._must_cancel: if self._fut_waiter.cancel(msg=self._cancel_message): self._must_cancel = False else: # 不能被await兩次 new_exc = RuntimeError( f'yield was used instead of yield from ' f'in task {self!r} with {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) elif result is None: # 放棄了對事件迴圈的控制權,代表自己託管的coroutine可能有個coroutine在執行,接下來會把控制權交給他和事件迴圈 # 當前的coroutine裡面即使沒有Future或者Task,但是子Future可能有 self._loop.call_soon(self.__step, context=self._context) finally: _leave_task(self._loop, self) self = None # Needed to break cycles when an exception occurs. def __wakeup(self, future): # 其它Task和Future完成後會呼叫到該函數,接下來進行一些處理 try: # 回收Future的狀態,如果Future發生了異常,則把異常傳回給自己 future.result() except BaseException as exc: # This may also be a cancellation. self.__step(exc) else: # Task並不需要自己託管的Future的結果值,而且如下注釋,這樣能使排程變得更快 # Don't pass the value of `future.result()` explicitly, # as `Future.__iter__` and `Future.__await__` don't need it. # If we call `_step(value, None)` instead of `_step()`, # Python eval loop would use `.send(value)` method call, # instead of `__next__()`, which is slower for futures # that return non-generator iterators from their `__iter__`. self.__step() self = None # Needed to break cycles when an exception occurs.
這份原始碼的Task物件中的__setp
方法比較長,經過精簡後可以發現他主要做的工作有三個:
send
或者throw
來驅動Coroutine進行下一步loop.call_soon
來讓步,把控制權交給事件迴圈單通過原始碼分析可能很難明白, 以下是以兩種Coroutine
的程式碼為例子,簡單的闡述Task與事件迴圈排程的過程,首先是demo_coro
,這個例子中只有一個Task:
# demo_coro.py import asyncio import time async def main(): await asyncio.sleep(1) await asyncio.sleep(2) s_t = time.time() asyncio.run(main()) print(time.time() - s_t) # // Output: 3.0028765201568604
這個例子中第一步是把main
轉為一個Task,然後呼叫到了對應的__step
方法,這時候__step
方法會會呼叫main()
這個Coroutine的send(None)
方法。
之後整個程式的邏輯會直接轉到main
函數中的await asyncio.sleep(1)
這個Coroutine中,await asyncio.sleep(1)
會先生成一個Future物件,並通過loop.call_at
告訴事件迴圈在1秒後啟用這個Future物件,然後把物件返回。這時候邏輯會重新回到Task的__step
方法中,__step
發現send
呼叫得到的是一個Future物件,所以就在這個Future新增一個回撥,讓Future完成的時候來啟用自己,然後放棄了對事件迴圈的控制權。接著就是事件迴圈在一秒後啟用了這個Future物件,這時程式邏輯就會執行到Future的回撥,也就是Task的__wakeup
方法,於是Task的__step
又被呼叫到了,而這次遇到的是後面的await asyncio.sleep(2)
,於是又走了一遍上面的流程。當兩個asyncio.sleep
都執行完成後,Task的__step
方法裡在對Coroutine傳送一個send(None)
後就捕獲到了StopIteration
異常,這時候Task就會通過set_result
設定結果,並結束自己的排程流程。
可以看到demo_core.py
中只有一個Task在負責和事件迴圈一起排程,事件迴圈的開始一定是一個Task,並通過Task來調起一個Coroutine,通過__step
方法把後續的Future,Task,Coroutine都當成一條鏈來執行,而demo_task.py
則不一樣了,它有兩個Task,程式碼如下:
# demo_task.py import asyncio import time async def main(): task_1 = asyncio.create_task(asyncio.sleep(1)) task_2 = asyncio.create_task(asyncio.sleep(2)) await task_1 await task_2 s_t = time.time() asyncio.run(main()) print(time.time() - s_t) # // Output: 2.0027475357055664
這個例子中第一步還是跟demo_coro
一樣,但跳轉到main
函數後就開始有區別了,首先在這函數中建立了task1和task2兩個Task,他們分別都會通過__step
方法中的send
啟用對應的asyncio.sleep
Coroutine,然後等待對應的Future來通知自己已經完成了。而對於建立了這兩個Task的main Task來說,通過main
函數的awati task_1
和await task_2
來獲取到他們的“控制權“。首先是通過await task_1
語句,main Task中的__step
方法裡在呼叫send
後得到的是task_1對應的Future,這時候就可以為這個Future新增一個回撥,讓他完成時通知自己,自己再走下一步,對於task_2也是如此。 直到最後兩個task都執行完成,main Task也捕獲到了StopIteration
異常,通過set_result
設定結果,並結束自己的排程流程。
可以看到demo_task.py
與demo_coro.py
有個明顯的區別在於main Task在執行的生命週期中建立了兩個Task,並通過await
託管了兩個Task,同時兩個Task又能實現兩個協程的並行,所以可以發現事件迴圈執行期間,當前協程的並行數永遠小於事件迴圈中註冊的Task數量。此外,如果在main Task中如果沒有顯式的進行await
,那麼子Task就會逃逸,不受main Task管理,如下:
# demo_task.py import asyncio import time def mutli_task(): task_1 = asyncio.create_task(asyncio.sleep(1)) task_2 = asyncio.create_task(asyncio.sleep(2)) async def main(): mutli_task() await asyncio.sleep(1.5) s_t = time.time() asyncio.run(main()) print(time.time() - s_t) # // Output: 1.5027475357055664
在這段程式碼中,main Task在執行到mutli_task
時,會建立出兩個task,但是在__step
中的coro.send(None)
呼叫得到的結果卻是await asyncio.sleep(1.5)
返回的Future,所以main Task只能呼叫到這個Future的add_don_callback
來裝載自己的__wakeup
方法,最終導致到main Task只能託管到await asyncio.sleep(1.5)
的Future,而mutli_task
建立的task則逃逸了,成為另一條鏈的頂點Task。
不過這個程式的事件迴圈只管理到了main Task
所以事件迴圈會一直執行,直到main Task
執行結束的時候才退出,這時程式會跟著一起退出,所以程式的執行時間只有1.5秒左右。
此外由於另外的Task也是註冊到這個事件迴圈上面,所以事件迴圈會幫忙把task_1執行完畢,而task_2定義的休眠時間是2秒,程式退出之前事件迴圈會發現有個Task尚未執行完畢,於是會對這個Task進行清理並列印一條警報。
在深入了Task,Future的原始碼瞭解後,瞭解了Task和Future在Asyncio
的作用,同時也發現Task和Future都跟loop有一定的耦合,而loop也可以通過一定的方法來建立Task和Future,所以如果要真正的理解到Asyncio
的排程原理,還需要更進入一步,通過Asyncio
的原始碼來了解整個Asyncio
的設計。
到此這篇關於Python Asyncio中Coroutines,Tasks,Future可等待物件的關係及作用的文章就介紹到這了,更多相關Python Asyncio 內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45