首頁 > 軟體

Python並行程式設計之未來模組Futures

2022-05-18 10:01:14

不論是哪一種語言,並行程式設計都是一項非常重要的技巧。比如我們上一章用的爬蟲,就被廣泛用在工業的各個領域。我們每天在各個網站、App上獲取的新聞資訊,很大一部分都是通過並行程式設計版本的爬蟲獲得的。

正確併合理的使用並行程式設計,無疑會給我們的程式帶來極大效能上的提升。今天我們就一起學習Python中的並行程式設計——Futures。

區分並行和並行

我們在學習並行程式設計時,常常會聽到兩個詞:並行(Concurrency)和並行(Parallelism)這兩個術語。這兩者經常一起使用,導致很多人以為他們是一個意思,其實是不對的。

首先要辨別一個誤區,在Python中,並行並不是只同一時刻上右多個操作(thread或者task)同時進行。相反,在某個特定的時刻上它只允許有一個操作的發生,只不過執行緒或任務之間會相互切換直到完成,就像下面的圖裡表達的

在上圖中出現了task和thread兩種切換順序的不同方式。分別對應了Python中並行兩種形式——threading和asyncio。

對於執行緒,作業系統知道每個執行緒的所有資訊,因此他會做主在適當的時候做執行緒切換,這樣的好處就是程式碼容易編寫,因為程式設計師不需要做任何切換操作的處理;但是切換執行緒的操作,有可能出現在一個語句的執行過程中( 比如X+=1),這樣比較容易出現race condiiton的情況。

而對於asyncio,主程式想要切換任務的時候必須得到此任務可以被切換的通知,這樣一來就可以避免出現上面的race condition的情況。

至於所謂的並行,只在同一時刻、同時發生。Python中的multi-Processing便是這個意思對應多程序,我們可以這麼簡單的理解,如果我們的電腦是8核的CPU,那麼在執行程式時,我們可以強制Python開啟8個程序,同時執行,用以加快程式的執行速度。大概是下面這個圖的思路

對比看來,並行通常用於I/O操作頻繁的場景。比方我們要從網站上下載多個檔案,由於I/O操作的時間要比CPU操作的時長多的多,這時並行就比較適合。而在CPU使用比較heavy的場景中,為了加快執行速度,我們會多用幾臺機器,讓多個處理器來運算。

還記得以前寫了個部落格總結過:在Python中的多執行緒是依靠CPU切換上下文實現的一種“偽多執行緒”,在進行大量執行緒切換過程中會佔用比較多的CPU資源,而在進行IO操作時候(不論是在網路上進行資料互動還是從記憶體、硬碟上讀寫資料)是不需要CPU進行計算的。所以多執行緒只適用於IO操作密集的環境,不適用於計算密集型操作。

並行程式設計之Futures

單執行緒於多執行緒效能比較

我們下面通過一個範例,從程式碼的角度來理解並行程式設計中的Futures,並進一步比較其於單執行緒的效能區別

假設我們有個任務,從網站上下載一些內容然後列印出來,如果用單執行緒的方式是這樣實現的

import requests
import time
def download_one(url):
    resp = requests.get(url)
    print('Read {} from {}'.format(len(resp.content),url))
def download_all(urls):
    for url in urls:
        download_one(url)
def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society', 
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)' 
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites),end_time-start_time))
if __name__ == '__main__':
    main()

這是種最簡單暴力最直接的方式:

先遍歷儲存網站的列表

對當前的網站進行下載操作

當前操作完成後,再對下一個網站進行同樣的操作,一直到結束。

可以試出來總耗時大概是2s多,單執行緒的方式簡單明瞭,但是最大的問題是效率低下,程式最大的時間都消耗在I/O等待上(這還是用的print,如果是寫在硬碟上的話時間會更多)。如果在實際生產環境中,我們需要存取的網站至少是以萬為單位的,所以這個方案根本行不通。

接著我們看看多執行緒版本的程式碼

import concurrent.futures
import requests
import threading
import time
def download_one(url):
    resp = requests.get(url).content
    print('Read {} from {}'.format(len(resp),url))
def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_one,sites)
def main():
    sites = [
    'https://en.wikipedia.org/wiki/Portal:Arts',
    'https://en.wikipedia.org/wiki/Portal:History',
    'https://en.wikipedia.org/wiki/Portal:Society', 
    'https://en.wikipedia.org/wiki/Portal:Biography',
    'https://en.wikipedia.org/wiki/Portal:Mathematics',
    'https://en.wikipedia.org/wiki/Portal:Technology',
    'https://en.wikipedia.org/wiki/Portal:Geography',
    'https://en.wikipedia.org/wiki/Portal:Science',
    'https://en.wikipedia.org/wiki/Computer_science',
    'https://en.wikipedia.org/wiki/Python_(programming_language)',
    'https://en.wikipedia.org/wiki/Java_(programming_language)',
    'https://en.wikipedia.org/wiki/PHP',
    'https://en.wikipedia.org/wiki/Node.js',
    'https://en.wikipedia.org/wiki/The_C_Programming_Language',
    'https://en.wikipedia.org/wiki/Go_(programming_language)' 
    ]
    start_time = time.perf_counter()
    download_all(sites)
    # for i in sites:
    end_time = time.perf_counter()
    # print('Down {} sites in {} seconds'.format(len(sites),end_time-start_time))
if __name__ == '__main__':
    main()

這段程式碼的執行時長大概是0.2s,效率一下提升了10倍多,可以注意到這個版本和單執行緒的區別主要在下面:

def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_one,sites)

在上面的程式碼中我們建立了一個執行緒池,有5個執行緒可以分配使用。executer.map()與以前將的Python內建的map()函數,表示對sites中的每一個元素並行的呼叫函數download_one()函數。

順便提一下,在download_one()函數中,我們使用的requests.get()方法是執行緒安全的(thread-safe),因此在多執行緒的環境下,它也可以安全使用,並不會出現race condition(條件競爭)的情況。

另外,雖然執行緒的數量可以自己定義,但是執行緒數並不是越多越好,以為執行緒的建立、維護和刪除也需要一定的開銷。所以如果設定的很大,反而會導致速度變慢,我們往往要根據實際的需求做一些測試,來尋找最優的執行緒數量。

當然,我們也可以用並行的方式去提高執行效率,只需要在download_all()函數中做出下面的變化即可

def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        to_do = []
        for site in sites:
            future = executor.submit(download_one,site)
            to_do.append(site)

        for future in concurrent.futures.as_completed(to_do):
            future.result()

在需要改的這部分程式碼中,函數ProcessPoolExecutor()表示建立程序池,使用多個程序並行的執行程式。不過,這裡 通常省略引數workers,因為系統會自動返回CPU的數量作為可以呼叫的程序數。

就像上面說的,並行方式一般用在CPU密集型的場景中,因為對於I/O密集型操作多數時間會用於等待,相比於多執行緒,使用多程序並不會提升效率,反而很多時候,因為CPU數量的限制,會導致執行效率不如多執行緒版本。

到底什麼是Futures?

Python中的Futures,位於concurrent.futures和asyncio中,他們都表示帶有延遲的操作,Futures會將處於等待狀態的操作包裹起來放到佇列中,這些操作的狀態可以隨時查詢。而他們的結果或是異常,也能在操作後被獲取。

通常,作為使用者,我們不用考慮如何去建立Futures,這些Futures底層會幫我們處理好,我們要做的就是去schedule這些Futures的執行。比方說,Futures中的Executor類,當我們中的方法done(),表示相對應的操作是否完成——用True表示已完成,ongFalse表示未完成。不過,要注意的是done()是non-blocking的,會立刻返回結果,相對應的add_done_callback(fn),則表示Futures完成後,相對應的引數fn,會被通知並執行呼叫。

Futures裡還有一個非常重要的函數result(),用來表示future完成後,返回器對應的結果或異常。而as_completed(fs),則是針對給定的future迭代器fs,在其完成後,返回完成後的迭代器。

所以也可以把上面的例子寫成下面的形式:

def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        to_do = []
        for site in sites:
            future = executor.submit(download_one,site)
            to_do.append(site)
        for future in concurrent.futures.as_completed(to_do):
            future.result()

這裡,我們首先用executor.submit(),將下載每個網站的內容都放進future佇列to_do裡等待執行。然後是as_completed()函數,在future完成後輸出結果

不過這裡有個事情要注意一下:future列表中每個future完成的順序和他在列表中的順序不一定一致,至於哪個先完成,取決於系統的排程和每個future的執行時間。

為什麼多執行緒每次只有一個執行緒執行?

前面我們講過,在一個時刻下,Python主程式只允許有一個執行緒執行,所以Python的並行,是通過多執行緒的切換完成的,這是為什麼呢?

這就又和以前講的知識串聯到一起了——GIL(全域性直譯器鎖),這裡在複習下:

事實上,Python的直譯器並不是執行緒安全的,為了解決由此帶來的race condition等問題,Python就引入了GIL,也就是在同一個時刻,只允許一個執行緒執行。當然,在進行I/O操作是,如果一個執行緒被block了,GIL就會被釋放,從而讓另一個執行緒能夠繼續執行。

總結

這節課裡我們先學習了Python中並行和並行的概念

並行——通過執行緒(thread)和任務(task)之間相互切換的方式實現,但是同一時刻,只允許有一個執行緒或任務執行

並行——多個程序同時進行。

並行通常用於I/O頻繁操作的場景,而並行則適用於CPU heavy的場景

隨後我們通過一個下載網站內容的例子,比較了單執行緒和運用FUtures的多執行緒版本的效能差異,顯而易見,合理的運用多執行緒,能夠極大的提高程式執行效率。

我們還大致瞭解了Futures的方式,介紹了一些常用的函數,並輔以範例加以理解。

要注意,Python中之所以同一時刻只允許一個執行緒執行,其實是由於GIL的存在。但是對於I/O操作而言,當其被block的時候,GIL會被釋放,使其他執行緒繼續執行。

以上就是Python並行程式設計之未來模組Futures的詳細內容,更多關於Python並行未來模組Futures的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com