首頁 > 軟體

簡單有效上手Python3非同步asyncio問題

2023-11-04 06:00:42

Python3非同步asyncio問題

官方檔案:

https://docs.python.org/zh-cn/3/library/asyncio-task.html

看了一大堆相關的資料和教學,針對的Python版本不同,寫法也各不一致,翻了翻官方的檔案,發現其實越高版本的Python對非同步進行封裝的越方便,官方說法叫高層級API,甚至都不用去理解什麼Futuretaskloop之類的概念了,我現在用的是Python 3.7.5,可以這樣很簡單的實現阻塞等待非同步並行:如果沒有複雜需求的話,用高層級API就可以了。

如果涉及到回撥的話貌似還得用低層級的API,後面單獨記錄。

import asyncio


async def first():
    print('first函數呼叫開始,下面將會等待3秒模擬函數執行完畢')
    await asyncio.sleep(3)
    print('first函數執行完畢')


async def last():
    print('last函數呼叫開始')
    await asyncio.sleep(2)
    print('last函數執行完畢')


async def func(delay):
    print('開始非同步同時執行的函數+延遲: ' + str(delay))
    await asyncio.sleep(delay)
    print('--非同步函數執行完畢+延遲: ' + str(delay))


async def main():
    await first()  # 這裡先呼叫first()函數,並且等它執行完了才會開始
    await asyncio.gather(
        func(1),
        func(2),
        func(3)
    )
    await last()


asyncio.run(main())

上面程式碼實際執行的過程是:

  • 開始執行first()函數
  • first()執行完畢後開始並行執行下面gather中加入的
  • 三個函數(任務)三個函數全部並行執行完畢後執行last()

官方檔案中給的建議是隻建立一個主入口的main()函數(當然這個函數名可以自定義的),將要呼叫的其他函數都放在這個函數中,然後再使用asyncio.run()啟動,理想情況下應當只被呼叫一次.

上圖:

更新

上面所謂的高階層API用法最後一行asyncio.run(main())和下面使用低階層API實現效果是一樣的:

loop = asyncio.get_event_loop()
task = loop.create_task(main())
loop.run_until_complete(task)

下面是學習過程中記錄的偏低層實現的資料

最基本的定義和應用

import asyncio

# 定義一個可以非同步呼叫的函數,其型別為coroutine
async def func1():
    pass

if __name__ == '__main__':
    loop = asyncio.get_event_loop()	# 定義一個用來回圈非同步函數的loop物件
    task = asyncio.ensure_future(func1())	# 建立一個呼叫非同步函數的任務,task型別為future
    # task = loop.create_task(func1())	# 使用loop的.create_task()建立任務,效果一樣
    loop.run_until_complete(task)	# 開始在loop迴圈中執行非同步函數,直到該函數執行完畢

asyncio.ensure_future(coroutine)loop.create_task(coroutine)都可以建立一個taskrun_until_complete的引數是一個futrue物件。

當傳入一個協程,其內部會自動封裝成task,task是Future的子類。

isinstance(task, asyncio.Future)將會輸出True。


future型別的任務可以在loop.run_until_complete中執行,也可以直接用await+任務變數名阻塞?呼叫

什麼時候使用非同步

import asyncio

# 這是一個耗時很少的非同步函數
async def msg(text):	
    await asyncio.sleep(0.1)
    print(text)

# 這是一個耗時較長的非同步函數
async def long_operation():
    print('long_operation started')
    await asyncio.sleep(3)
    print('long_operation finished')

# 主函數部分,同樣需要宣告為async非同步型別
async def main():
    await msg('first')
    # 現在需要呼叫一個耗時較長的函數操作,不希望阻塞的等待它執行完畢
    # 希望long_operation在開始執行後,立即呼叫msg,這裡就可以將long_operation封裝到task任務中
    task = asyncio.ensure_future(long_operation())
    await msg('second')
    # 開始task中的long_operation函數
    await task
    # task執行完畢後會繼續下面的程式碼
    print('All done.')

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

執行結果:

並行和並行

並行和並行一直是容易混淆的概念。並行通常指有多個任務需要同時進行,並行則是同一時刻有多個任務執行。

用上課來舉例就是,並行情況下是一個老師在同一時間段輔助不同的人功課。

並行則是好幾個老師分別同時輔助多個學生功課。

簡而言之就是一個人同時吃三個饅頭還是三個人同時分別吃一個的情況,吃一個饅頭算一個任務。

asyncio實現並行,就需要多個協程來完成任務,每當有任務阻塞的時候就await,然後其他協程繼續工作。

建立多個協程的列表,然後將這些協程註冊到事件迴圈中。

import asyncio

import time

now = lambda: time.time()

async def do_some_work(x):
    print('Waiting: ', x)

    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)

start = now()

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print('Task ret: ', task.result())

print('TIME: ', now() - start)

結果如下

Waiting:  1
Waiting:  2
Waiting:  4
Task ret:  Done after 1s
Task ret:  Done after 2s
Task ret:  Done after 4s
TIME:  4.003541946411133

總時間為4s左右。4s的阻塞時間,足夠前面兩個協程執行完畢。如果是同步順序的任務,那麼至少需要7s。此時我們使用了aysncio實現了並行。

asyncio.wait(tasks) 也可以使用 asyncio.gather(*tasks) ,前者接受一個task列表,後者接收一堆task。

非同步結果回撥

找了個別人寫的例子,大致理解了下實現過程:

  • 定義async非同步函數
  • 定義普通函數用於處理回撥
  • 獲取非同步函數的coroutine協程物件(其實就是不帶await修飾直接執行非同步函數返回的那個物件)
  • 獲取loop迴圈物件
  • 使用低階層API手動建立task任務
  • task任務物件註冊回撥函數
  • 啟動loop迴圈呼叫非同步函數
import time
import asyncio

now = lambda : time.time()

async def do_some_work(x):
    print('Waiting: ', x)
    return 'Done after {}s'.format(x)

def callback(future):
    print('Callback: ', future.result())

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
#task = asyncio.ensure_future(coroutine)	# 貌似和上面用loop建立任務效果一樣
task.add_done_callback(callback)
loop.run_until_complete(task)

print('TIME: ', now() - start)

這裡需要注意,在使用低層級API手動建立非同步任務的時候,不能同時使用高層級API的簡單操作了,比如這裡建立task任務物件的時候,就不能用asyncio.create_task(),否則會找不到loop物件,返回下面的錯誤

RuntimeError: no running event loop

翻了一下asynciotasks.py原始碼,原來所謂的高層級API就是這麼簡單的封裝啊…

呼叫的時候在函數內部又定義了一遍loop

def create_task(coro):
    """Schedule the execution of a coroutine object in a spawn task.

    Return a Task object.
    """
    loop = events.get_running_loop()
    return loop.create_task(coro)

我自己寫的例子

建立4個非同步任務同時開始執行,每個任務執行完成後將結果追加到result_list陣列變數中.

import asyncio

result_list = []


async def fun(var):
    return var + 1


def callbackFun(future):
    result_list.append(future.result())


task_list = []

for i in range(1, 5):
    cor = fun(i)
    task = asyncio.ensure_future(cor)
    task.add_done_callback(callbackFun)
    task_list.append(task)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(task_list))
print(result_list)

當然,如果不需要使用回撥函數,而是等所有提交的非同步任務都執行完成後獲取它們的結果,可以這樣寫:

# 前面省略
loop.run_until_complete(asyncio.wait(task_list))
# task_list中的每個任務執行完成後可以呼叫它的.result()方法來獲取結果
for task in task_list:
    print('每個task執行完的結果:', task.result())

總結

以上為個人經驗,希望能給大家一個參考,也希望大家多多支援it145.com。


IT145.com E-mail:sddin#qq.com