首頁 > 軟體

python協程與 asyncio 庫詳情

2022-05-16 22:00:21

前言:

python 中協程概念是從 3.4 版本增加的,但 3.4 版本採用是生成器實現,為了將協程和生成器的使用場景進行區分,使語意更加明確,在 python 3.5 中增加了 async 和 await 關鍵字,用於定義原生協程。

1.asyncio 非同步 I/O 庫

python 中的 asyncio 庫提供了管理事件、協程、任務和執行緒的方法,以及編寫並行程式碼的原語,即 async 和 await

該模組的主要內容:

  • 事件迴圈:event_loop,管理所有的事件,是一個無限迴圈方法,在迴圈過程中追蹤事件發生的順序將它們放在佇列中,空閒時則呼叫相應的事件處理者來處理這些事件;
  • 協程:coroutine,子程式的泛化概念,協程可以在執行期間暫停,等待外部的處理(I/O 操作)完成之後,再從暫停的地方繼續執行,函數定義式使用 async關鍵字,這樣這個函數就不會立即執行,而是返回一個協程物件;
  • FutureTaskFuture物件表示尚未完成的計算,Task是 Future的子類,包含了任務的各個狀態,作用是在執行某個任務的同時可以並行的執行多個任務。

非同步函數的定義

非同步函數本質上依舊是函數,只是在執行過程中會將執行權交給其它協程,與普通函數定義的區別是在 def關鍵字前增加 async

# 非同步函數
import asyncio
# 非同步函數
async def func(x):
    print("非同步函數")
    return x ** 2
ret = func(2)
print(ret)

執行程式碼輸入如下內容:

sys:1: RuntimeWarning: coroutine 'func' was never awaited
<coroutine object func at 0x0000000002C8C248>

函數返回一個協程物件,如果想要函數得到執行,需要將其放到事件迴圈 event_loop中。

事件迴圈 event_loop

event_loop是 asyncio模組的核心,它將非同步函數註冊到事件迴圈上。 過程實現方式為:由 loop在適當的時候呼叫協程,這裡使用的方式名為 asyncio.get_event_loop(),然後由 run_until_complete(協程物件) 將協程註冊到事件迴圈中,並啟動事件迴圈。

import asyncio
# 非同步函數
async def func(x):
    print("非同步函數")
    return x ** 2
# 協程物件,該物件不能直接執行
coroutine1 = func(2)
# 事件迴圈物件
loop = asyncio.get_event_loop()
# 將協程物件加入到事件迴圈中,並執行
ret = loop.run_until_complete(coroutine1)
print(ret)

首先在 python 3.7 之前的版本中使用非同步函數是安裝上述流程:

  • 先通過 asyncio.get_event_loop()獲取事件迴圈loop物件;
  • 然後通過不同的策略呼叫 loop.run_until_complete()或者loop.run_forever()執行非同步函數。

在 python 3.7 之後的版本,直接使用 asyncio.run() 即可,該函數總是會建立一個新的事件迴圈並在結束時進行關閉。

最新的官方檔案 都採用的是run方法。 官方案例

import asyncio
async def main():
    print('hello')
    await asyncio.sleep(1)
    print('world')
asyncio.run(main())

接下來在檢視一個完整的案例,並且結合await關鍵字。

import asyncio
import time
# 非同步函數1
async def task1(x):
    print("任務1")
    await asyncio.sleep(2)
    print("恢復任務1")
    return x
# 非同步函數2
async def task2(x):
    print("任務2")
    await asyncio.sleep(1)
    print("恢復任務2")
    return x
async def main():
    start_time = time.perf_counter()
    ret_1 = await task1(1)
    ret_2 = await task2(2)
    print("任務1 返回的值是", ret_1)
    print("任務2 返回的值是", ret_2)
    print("執行時間", time.perf_counter() - start_time)
if __name__ == '__main__':
	# 建立一個事件迴圈
    loop = asyncio.get_event_loop()
    # 將協程物件加入到事件迴圈中,並執行
    loop.run_until_complete(main())

程式碼輸出如下所示:

任務1
恢復任務1
任務2
恢復任務2
任務1 返回的值是 1
任務2 返回的值是 2
執行時間 2.99929154

上述程式碼建立了 3 個協程,其中 task1和 task2都放在了協程函數 main中,I/O 操作通過 asyncio.sleep(1)進行模擬,整個函數執行時間為 2.9999 秒,接近 3 秒,依舊是序列進行,如果希望修改為並行執行,將程式碼按照下述進行修改。

import asyncio
import time
# 非同步函數1
async def task1(x):
    print("任務1")
    await asyncio.sleep(2)
    print("恢復任務1")
    return x
# 非同步函數2
async def task2(x):
    print("任務2")
    await asyncio.sleep(1)
    print("恢復任務2")
    return x
async def main():
    start_time = time.perf_counter()
    ret_1,ret_2 = await asyncio.gather(task1(1),task2(2))
    print("任務1 返回的值是", ret_1)
    print("任務2 返回的值是", ret_2)
    print("執行時間", time.perf_counter() - start_time)
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

上述程式碼最大的變化是將task1task2放到了asyncio.gather()中執行,此時程式碼輸出時間明顯變短。

任務1
任務2
恢復任務2 # 任務2 由於等待時間短,先返回。
恢復任務1
任務1 返回的值是 1
任務2 返回的值是 2
執行時間 2.0005669480000003

asyncio.gather()可以更換為asyncio.wait()修改程式碼如下所示:

import asyncio
import time
# 非同步函數1
async def task1(x):
    print("任務1")
    await asyncio.sleep(2)
    print("恢復任務1")
    return x
# 非同步函數2
async def task2(x):
    print("任務2")
    await asyncio.sleep(1)
    print("恢復任務2")
    return x
async def main():
    start_time = time.perf_counter()
    done, pending = await asyncio.wait([task1(1), task2(2)])
    print(done)
    print(pending)
    print("執行時間", time.perf_counter() - start_time)
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

asyncio.wait()返回一個元組,其中包含一個已經完成的任務集合,一個未完成任務的集合。

gather 和 wait 的區別:

  • gather:需要所有任務都執行結束,如果任意一個協程函數崩潰了,都會拋異常,不會返回結果;
  • wait:可以定義函數返回的時機,可以設定為 FIRST_COMPLETED(第一個結束的), FIRST_EXCEPTION(第一個出現異常的), ALL_COMPLETED(全部執行完,預設的)。
done,pending = await asyncio.wait([task1(1),task2(2)],return_when=asyncio.tasks.FIRST_EXCEPTION)

建立 task

由於協程物件不能直接執行,在註冊到事件迴圈時,是run_until_complete方法將其包裝成一個 task物件。該物件是對coroutine物件的進一步封裝,它比coroutine物件多了執行狀態,例如 pendingrunningfinished,可以利用這些狀態獲取協程物件的執行情況。

下面顯示的將coroutine物件封裝成task物件,在上述程式碼基礎上進行修改。

import asyncio
import time
# 非同步函數1
async def task1(x):
    print("任務1")
    await asyncio.sleep(2)
    print("恢復任務1")
    return x
# 非同步函數2
async def task2(x):
    print("任務2")
    await asyncio.sleep(1)
    print("恢復任務2")
    return x
async def main():
    start_time = time.perf_counter()
    # 封裝 task 物件
    coroutine1 = task1(1)
    task_1 = loop.create_task(coroutine1)
    coroutine2 = task2(2)
    task_2 = loop.create_task(coroutine2)
    ret_1, ret_2 = await asyncio.gather(task_1, task_2)
    print("任務1 返回的值是", ret_1)
    print("任務2 返回的值是", ret_2)
    print("執行時間", time.perf_counter() - start_time)
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

由於task物件是future物件的子類物件,所以上述程式碼也可以按照下述內容修改:

# task_2 = loop.create_task(coroutine2)
task_2 = asyncio.ensure_future(coroutine2)

下面將task物件的各個狀態進行列印輸出。

import asyncio
import time
# 非同步函數1
async def task1(x):
    print("任務1")
    await asyncio.sleep(2)
    print("恢復任務1")
    return x
# 非同步函數2
async def task2(x):
    print("任務2")
    await asyncio.sleep(1)
    print("恢復任務2")
    return x
async def main():
    start_time = time.perf_counter()
    # 封裝 task 物件
    coroutine1 = task1(1)
    task_1 = loop.create_task(coroutine1)
    coroutine2 = task2(2)
    # task_2 = loop.create_task(coroutine2)
    task_2 = asyncio.ensure_future(coroutine2)
    # 進入 pending 狀態
    print(task_1)
    print(task_2)
    # 獲取任務的完成狀態
    print(task_1.done(), task_2.done())
    # 執行任務
    await task_1
    await task_2
    # 再次獲取完成狀態
    print(task_1.done(), task_2.done())
    # 獲取返回結果
    print(task_1.result())
    print(task_2.result())
    print("執行時間", time.perf_counter() - start_time)
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

await task_1表示的是執行該協程,執行結束之後,task.done()返回 Truetask.result()獲取返回值。

回撥返回值

當協程執行完畢,需要獲取其返回值,剛才已經演示了一種辦法,使用 task.result()方法獲取,但是該方法僅當協程執行完畢時,才能獲取結果,如果協程沒有執行完畢,result()方法會返回 asyncio.InvalidStateError(無效狀態錯誤)。

一般編碼都採用第二種方案,通過add_done_callback()方法系結回撥。

import asyncio
import requests
async def request_html():
    url = 'https://www.csdn.net'
    res = requests.get(url)
    return res.status_code
def callback(task):
    print('回撥:', task.result())
loop = asyncio.get_event_loop()
coroutine = request_html()
task = loop.create_task(coroutine)
# 繫結回撥
task.add_done_callback(callback)
print(task)
print("*"*100)
loop.run_until_complete(task)
print(task)

上述程式碼當coroutine執行完畢時,會呼叫callback函數。

如果回撥函數需要多個引數,請使用functools模組中的偏函數(partial)方法

迴圈事件關閉

建議每次編碼結束之後,都呼叫回圈事件物件close()方法,徹底清理loop物件。

2.本節爬蟲專案

本節課要採集的站點由於全部都是 coser 圖片,所以地址在程式碼中檢視即可。

完整程式碼如下所示:

import threading
import asyncio
import time
import requests
import lxml
from bs4 import BeautifulSoup
async def get(url):
    return requests.get(url)
async def get_html(url):
    print("準備抓取:", url)
    res = await get(url)
    return res.text
async def save_img(img_url):
    # thumbMid_5ae3e05fd3945 將小圖替換為大圖
    img_url = img_url.replace('thumb','thumbMid')
    img_url = "http://mycoser.com/" + img_url
    print("圖片下載中:", img_url)
    res = await get(img_url)
    if res is not None:
        with open(f'./imgs/{time.time()}.jpg', 'wb') as f:
            f.write(res.content)
            return img_url,"ok"
async def main(url_list):
    # 建立 5 個任務
    tasks = [asyncio.ensure_future(get_html(url_list[_])) for _ in range(len(url_list))]
    dones, pending = await asyncio.wait(tasks)
    for task in dones:
        html = task.result()
        soup = BeautifulSoup(html, 'lxml')
        divimg_tags = soup.find_all(attrs={'class': 'workimage'})
        for div in divimg_tags:
            ret = await save_img(div.a.img["data-original"])
            print(ret)
if __name__ == '__main__':
    urls = [f"http://mycoser.com/picture/lists/p/{page}" for page in range(1, 17)]
    totle_page = len(urls) // 5 if len(urls) % 5 == 0 else len(urls) // 5 + 1
    # 對 urls 列表進行切片,方便採集
    for page in range(0, totle_page):
        start_page = 0 if page == 0 else page * 5
        end_page = (page + 1) * 5
        # 迴圈事件物件
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main(urls[start_page:end_page]))

程式碼說明:上述程式碼中第一個要注意的是await關鍵字後面只能跟如下內容:

  • 原生的協程物件;
  • 一個包含await方法的物件返回的一個迭代器。

所以上述程式碼get_html函數中巢狀了一個協程 get。主函數 main裡面為了運算方便,直接對 urls 進行了切片,然後通過迴圈進行執行。

當然上述程式碼的最後兩行,可以直接修改為:

 # 迴圈事件物件
 # loop = asyncio.get_event_loop()
 #
 # loop.run_until_complete(main(urls[start_page:end_page]))
 asyncio.run(main(urls[start_page:end_page]))

輕鬆獲取一堆高清圖片:

到此這篇關於python協程與 asyncio 庫詳情的文章就介紹到這了,更多相關python 協程內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com