<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
雷猴啊,兄弟們!今天來展示一下如何用Python快速實現一個執行緒池。
當有多個 IO 密集型的任務要被處理時,我們自然而然會想到多執行緒。但如果任務非常多,我們不可能每一個任務都啟動一個執行緒去處理,這個時候最好的辦法就是實現一個執行緒池,至於池子裡面的執行緒數量可以根據業務場景進行設定。
比如我們實現一個有 10 個執行緒的執行緒池,這樣可以並行地處理 10 個任務,每個執行緒將任務執行完之後,便去執行下一個任務。通過使用執行緒池,可以避免因執行緒建立過多而導致資源耗盡,而且任務在執行時的生命週期也可以很好地把控。
而執行緒池的實現方式也很簡單,但這裡我們不打算手動實現,因為 Python 提供了一個標準庫 concurrent.futures,已經內建了對執行緒池的支援。所以本篇文章,我們就來詳細介紹一下該模組的用法。
當我們往執行緒池裡面提交一個函數時,會分配一個執行緒去執行,同時立即返回一個 Future 物件。通過 Future 物件可以監控函數的執行狀態,有沒有出現異常,以及有沒有執行完畢等等。如果函數執行完畢,內部便會呼叫 future.set_result 將返回值設定到 future 裡面,然後外界便可呼叫 future.result 拿到返回值。
除此之外 future 還可以繫結回撥,一旦函數執行完畢,就會以 future 為引數,自動觸發回撥。所以 future 被稱為未來物件,可以把它理解為函數的一個容器,當我們往執行緒池提交一個函數時,會立即建立相應的 future 然後返回。函數的執行狀態什麼的,都通過 future 來檢視,當然也可以給它繫結一個回撥,在函數執行完畢時自動觸發。
那麼下面我們就來看一下 future 的用法,文字的話理解起來可能有點枯燥。
將函數提交到執行緒池裡面執行時,會立即返回一個物件
這個物件就叫做 Future 物件,裡面包含了函數的執行狀態等等
當然我們也可以手動建立一個Future物件。
from concurrent.futures import Future # 建立 Future 物件 future future = Future() # 給 future 繫結回撥 def callback(f: Future): print("當set_result的時候會執行回撥,result:", f.result()) future.add_done_callback(callback) # 通過 add_done_callback 方法即可給 future 繫結回撥 # 呼叫的時候會自動將 future 作為引數 # 如果需要多個引數,那麼就使用偏函數 # 回撥函數什麼時候執行呢? # 顯然是當 future 執行 set_result 的時候 # 如果 future 是向執行緒池提交函數時返回的 # 那麼當函數執行完畢時會自動執行 future.set_result(xx) # 並將自身的返回設定進去 # 而這裡的 future 是我們手動建立的,因此需要手動執行 future.set_result("嘿嘿")
當set_result的時候會執行回撥,result: 嘿嘿
需要注意的是:只能執行一次 set_result,但是可以多次呼叫 result 獲取結果。
from concurrent.futures import Future future = Future() future.set_result("哼哼") print(future.result()) # 哼哼 print(future.result()) # 哼哼 print(future.result()) # 哼哼
執行 future.result() 之前一定要先 set_result,否則會一直處於阻塞狀態。當然 result 方法還可以接收一個 timeout 引數,表示超時時間,如果在指定時間內沒有獲取到值就會丟擲異常。
我們上面是手動建立的 Future 物件,但工作中很少會手動建立。我們將函數提交到執行緒池裡面執行的時候,會自動建立 Future 物件並返回。這個 Future 物件裡面就包含了函數的執行狀態,比如此時是處於暫停、執行中還是完成等等,並且函數在執行完畢之後,還會呼叫 future.set_result 將自身的返回值設定進去。
from concurrent.futures import ThreadPoolExecutor import time def task(name, n): time.sleep(n) return f"{name} 睡了 {n} 秒" # 建立一個執行緒池 # 裡面還可以指定 max_workers 引數,表示最多建立多少個執行緒 # Python學習交流裙279199867 # 如果不指定,那麼每提交一個函數,都會為其建立一個執行緒 executor = ThreadPoolExecutor() # 通過 submit 即可將函數提交到執行緒池,一旦提交,就會立刻執行 # 因為開啟了一個新的執行緒,主執行緒會繼續往下執行 # 至於 submit 的引數,按照函數名,對應引數提交即可 # 切記不可寫成task("古明地覺", 3),這樣就變成呼叫了 future = executor.submit(task, "螢幕前的你", 3) # 由於函數裡面出現了 time.sleep,並且指定的 n 是 3 # 所以函數內部會休眠 3 秒,顯然此時處於執行狀態 print(future) """ <Future at 0x7fbf701726d0 state=running> """ # 我們說 future 相當於一個容器,包含了內部函數的執行狀態 # 函數是否正在執行中 print(future.running()) """ True """ # 函數是否執行完畢 print(future.done()) """ False """ # 主程式也 sleep 3 秒 time.sleep(3) # 顯然此時函數已經執行完畢了 # 並且列印結果還告訴我們返回值型別是 str print(future) """ <Future at 0x7fbf701726d0 state=finished returned str> """ print(future.running()) """ False """ print(future.done()) """ True """ # 函數執行完畢時,會將返回值設定在 future 裡 # 也就是說一旦執行了 future.set_result # 那麼就表示函數執行完畢了,然後外界可以呼叫 result 拿到返回值 print(future.result()) """ 螢幕前的你 睡了 3 秒 """
這裡再強調一下 future.result(),這一步是會阻塞的,舉個例子:
# 提交函數 future = executor.submit(task, "螢幕前的你", 3) start = time.perf_counter() future.result() end = time.perf_counter() print(end - start) # 3.00331525
可以看到,future.result() 這一步花了將近 3s。其實也不難理解,future.result() 是幹嘛的?就是為了獲取函數的返回值,可函數都還沒有執行完畢,它又從哪裡獲取呢?所以只能先等待函數執行完畢,將返回值通過 set_result 設定到 future 裡面之後,外界才能呼叫 future.result() 獲取到值。
如果不想一直等待的話,那麼在獲取值的時候可以傳入一個超時時間。
from concurrent.futures import ( ThreadPoolExecutor, TimeoutError ) import time def task(name, n): time.sleep(n) return f"{name} 睡了 {n} 秒" executor = ThreadPoolExecutor() future = executor.submit(task, "螢幕前的你", 3) try: # 1 秒之內獲取不到值,丟擲 TimeoutError res = future.result(1) except TimeoutError: pass # 再 sleep 2 秒,顯然函數執行完畢了 time.sleep(2) # 獲取返回值 print(future.result()) """ 螢幕前的你 睡了 3 秒 """
當然啦,這麼做其實還不夠智慧,因為我們不知道函數什麼時候執行完畢。所以最好的辦法還是繫結一個回撥,當函數執行完畢時,自動觸發回撥。
from concurrent.futures import ThreadPoolExecutor import time def task(name, n): time.sleep(n) return f"{name} 睡了 {n} 秒" def callback(f): print(f.result()) executor = ThreadPoolExecutor() future = executor.submit(task, "螢幕前的你", 3) # 繫結回撥,3 秒之後自動呼叫 future.add_done_callback(callback) """ 螢幕前的你 睡了 3 秒 """
需要注意的是,在呼叫 submit 方法之後,提交到執行緒池的函數就已經開始執行了。而不管函數有沒有執行完畢,我們都可以給對應的 future 繫結回撥。
如果函數完成之前新增回撥,那麼會在函數完成後觸發回撥。如果函數完成之後新增回撥,由於函數已經完成,代表此時的 future 已經有值了,或者說已經 set_result 了,那麼會立即觸發回撥。
當函數執行完畢之後,會執行 set_result,那麼這個方法到底幹了什麼事情呢?
我們看到 future 有兩個被保護的屬性,分別是 _result 和 _state。顯然 _result 用於儲存函數的返回值,而 future.result() 本質上也是返回 _result 屬性的值。而 _state 屬性則用於表示函數的執行狀態,初始為 PENDING,執行中為 RUNING,執行完畢時被設定為 FINISHED。
呼叫 future.result() 的時候,會判斷 _state 的屬性,如果還在執行中就一直等待。當 _state 為 FINISHED 的時候,就返回 _result 屬性的值。
我們上面每次只提交了一個函數,但其實可以提交任意多個,我們來看一下:
from concurrent.futures import ThreadPoolExecutor import time def task(name, n): time.sleep(n) return f"{name} 睡了 {n} 秒" executor = ThreadPoolExecutor() futures = [executor.submit(task, "螢幕前的你", 3), executor.submit(task, "螢幕前的你", 4), executor.submit(task, "螢幕前的你", 1)] # 此時都處於running print(futures) """ [<Future at 0x1b5ff622550 state=running>, <Future at 0x1b5ff63ca60 state=running>, <Future at 0x1b5ff63cdf0 state=running>] """ time.sleep(3) # 主程式 sleep 3s 後 # futures[0]和futures[2]處於 finished # futures[1]仍處於 running print(futures) """ [<Future at 0x1b5ff622550 state=running>, <Future at 0x1b5ff63ca60 state=running>, <Future at 0x1b5ff63cdf0 state=finished returned str>] """
如果是多個函數,要如何拿到返回值呢?很簡單,遍歷 futures 即可。
executor = ThreadPoolExecutor() futures = [executor.submit(task, "螢幕前的你", 5), executor.submit(task, "螢幕前的你", 2), executor.submit(task, "螢幕前的你", 4), executor.submit(task, "螢幕前的你", 3), executor.submit(task, "螢幕前的你", 6)] for future in futures: print(future.result()) """ 螢幕前的你 睡了 5 秒 螢幕前的你 睡了 2 秒 螢幕前的你 睡了 4 秒 螢幕前的你 睡了 3 秒 螢幕前的你 睡了 6 秒 """
這裡面有一些值得說一說的地方,首先 futures 裡面有 5 個 future,記做 future1, future2, future3, future4, future5。
當使用 for 迴圈遍歷的時候,實際上會依次遍歷這 5 個 future,所以返回值的順序就是我們新增的函數的順序。由於 future1 對應的函數休眠了 5s,那麼必須等到 5s 後,future1 裡面才會有值。
但這五個函數是並行執行的,future2, future3, future4 由於只休眠了 2s, 4s, 3s,所以肯定會先執行完畢,然後執行 set_result,將返回值設定到對應的 future 裡。
但 Python 的 for 迴圈不可能在第一次迭代還沒有結束,就去執行第二次迭代。因為 futures 裡面的幾個 future 的順序已經一開始就被定好了,只有當第一個 future.result() 執行完成之後,才會執行第二個 future.result(),以及第三個、第四個。
因此即便後面的函數已經執行完畢,但由於 for 迴圈的順序,也只能等著,直到前面的 future.result() 執行完畢。所以當第一個 future.result() 結束時,後面三個 future.result() 會立刻輸出,因為它們內部的函數已經執行結束了。
而最後一個 future,由於內部函數 sleep 了 6 秒,因此要再等待 1 秒,才會列印 future.result()。
使用 submit 提交函數會返回一個 future,並且還可以給 future 繫結一個回撥。但如果不關心回撥的話,那麼還可以使用 map 進行提交。
executor = ThreadPoolExecutor() # map 內部也是使用了 submit results = executor.map(task, ["螢幕前的你"] * 3, [3, 1, 2]) # 並且返回的是迭代器 print(results) """ <generator object ... at 0x0000022D78EFA970> """ # 此時遍歷得到的是不再是 future # 而是 future.result() for result in results: print(result) """ 螢幕前的你 睡了 3 秒 螢幕前的你 睡了 1 秒 螢幕前的你 睡了 2 秒 """
可以看到,當使用for迴圈的時候,map 執行的邏輯和 submit 是一樣的。唯一的區別是,此時不需要再呼叫 result 了,因為返回的就是函數的返回值。
或者我們直接呼叫 list 也行。
executor = ThreadPoolExecutor() results = executor.map(task, ["螢幕前的你"] * 3, [3, 1, 2]) print(list(results)) """ ['螢幕前的你 睡了 3 秒', '螢幕前的你 睡了 1 秒', '螢幕前的你 睡了 2 秒'] """
results 是一個生成器,呼叫 list 的時候會將裡面的值全部產出。由於 map 內部還是使用的 submit,然後通過 future.result() 拿到返回值,而耗時最長的函數需要 3 秒,因此這一步會阻塞 3 秒。3 秒過後,會列印所有函數的返回值。
上面在獲取返回值的時候,是按照函數的提交順序獲取的。如果我希望哪個函數先執行完畢,就先獲取哪個函數的返回值,該怎麼做呢?
from concurrent.futures import ( ThreadPoolExecutor, as_completed ) import time def task(name, n): time.sleep(n) return f"{name} 睡了 {n} 秒" executor = ThreadPoolExecutor() futures = [executor.submit(task, "螢幕前的你", 5), executor.submit(task, "螢幕前的你", 2), executor.submit(task, "螢幕前的你", 1), executor.submit(task, "螢幕前的你", 3), executor.submit(task, "螢幕前的你", 4)] for future in as_completed(futures): print(future.result()) """ 螢幕前的你 睡了 1 秒 螢幕前的你 睡了 2 秒 螢幕前的你 睡了 3 秒 螢幕前的你 睡了 4 秒 螢幕前的你 睡了 5 秒 """
此時誰先完成,誰先返回。
我們通過 submit 可以將函數提交到執行緒池中執行,但如果我們想取消該怎麼辦呢?
executor = ThreadPoolExecutor() future1 = executor.submit(task, "螢幕前的你", 1) future2 = executor.submit(task, "螢幕前的你", 2) future3 = executor.submit(task, "螢幕前的你", 3) # 取消函數的執行 # 會將 future 的 _state 屬性設定為 CANCELLED future3.cancel() # 檢視是否被取消 print(future3.cancelled()) # False
問題來了,呼叫 cancelled 方法的時候,返回的是False,這是為什麼?很簡單,因為函數已經被提交到執行緒池裡面了,函數已經執行了。而只有在還沒有執行時,取消才會成功。
可這不矛盾了嗎?函數一旦提交就會執行,只有不執行才會取消成功,這怎麼辦?還記得執行緒池的一個叫做 max_workers 的引數嗎?用來控制執行緒池內的執行緒數量,我們可以將最大的執行緒數設定為2,那麼當第三個函數進去的時候,就不會執行了,而是處於暫停狀態。
executor = ThreadPoolExecutor(max_workers=2) future1 = executor.submit(task, "螢幕前的你", 1) future2 = executor.submit(task, "螢幕前的你", 2) future3 = executor.submit(task, "螢幕前的你", 3) # 如果池子裡可以建立空閒執行緒 # 那麼函數一旦提交就會執行,狀態為 RUNNING print(future1._state) # RUNNING print(future2._state) # RUNNING # 但 future3 內部的函數還沒有執行 # 因為池子裡無法建立新的空閒執行緒了,所以狀態為 PENDING print(future3._state) # PENDING # 取消函數的執行,前提是函數沒有執行 # 會將 future 的 _state 屬性設定為 CANCELLED future3.cancel() # 檢視是否被取消 print(future3.cancelled()) # True print(future3._state) # CANCELLED
在啟動執行緒池的時候,肯定是需要設定容量的,不然處理幾千個函數要開啟幾千個執行緒嗎。另外當函數被取消了,就不可以再呼叫 future.result() 了,否則的話會丟擲 CancelledError。
我們前面的邏輯都是函數正常執行的前提下,但天有不測風雲,如果函數執行時出現異常了該怎麼辦?
from concurrent.futures import ThreadPoolExecutor def task1(): 1 / 0 def task2(): pass executor = ThreadPoolExecutor(max_workers=2) future1 = executor.submit(task1) future2 = executor.submit(task2) print(future1) print(future2) """ <Future at 0x7fe3e00f9e50 state=finished raised ZeroDivisionError> <Future at 0x7fe3e00f9eb0 state=finished returned NoneType> """ # 結果顯示 task1 函數執行出現異常了 # 那麼這個異常要怎麼獲取呢? print(future1.exception()) print(future1.exception().__class__) """ division by zero <class 'ZeroDivisionError'> """ # 如果執行沒有出現異常,那麼 exception 方法返回 None print(future2.exception()) # None # 注意:如果函數執行出現異常了 # 那麼呼叫 result 方法會將異常丟擲來 future1.result() """ Traceback (most recent call last): File "...", line 4, in task1 1 / 0 ZeroDivisionError: division by zero """
出現異常時,呼叫 future.set_exception 將異常設定到 future 裡面,而 future 有一個 _exception 屬性,專門儲存設定的異常。當呼叫 future.exception() 時,也會直接返回 _exception 屬性的值。
假設我們往執行緒池提交了很多個函數,如果希望提交的函數都執行完畢之後,主程式才能往下執行,該怎麼辦呢?其實方案有很多:
第一種:
from concurrent.futures import ThreadPoolExecutor import time def task(n): time.sleep(n) return f"sleep {n}" executor = ThreadPoolExecutor() future1 = executor.submit(task, 5) future2 = executor.submit(task, 2) future3 = executor.submit(task, 4) # 這裡是不會阻塞的 print("start") # 遍歷所有的 future,並呼叫其 result 方法 # 這樣就會等到所有的函數都執行完畢之後才會往下走 for future in [future1, future2, future3]: print(future.result()) print("end") """ start sleep 5 sleep 2 sleep 4 end """
第二種:
from concurrent.futures import ( ThreadPoolExecutor, wait ) import time def task(n): time.sleep(n) return f"sleep {n}" executor = ThreadPoolExecutor() future1 = executor.submit(task, 5) future2 = executor.submit(task, 2) future3 = executor.submit(task, 4) # return_when 有三個可選引數 # FIRST_COMPLETED:當任意一個任務完成或者取消 # FIRST_EXCEPTION:當任意一個任務出現異常 # 如果都沒出現異常等同於ALL_COMPLETED # ALL_COMPLETED:所有任務都完成,預設是這個值 fs = wait([future1, future2, future3], return_when="ALL_COMPLETED") # 此時返回的fs是DoneAndNotDoneFutures型別的namedtuple # 裡面有兩個值,一個是done,一個是not_done print(fs.done) """ {<Future at 0x1df1400 state=finished returned str>, <Future at 0x2f08e48 state=finished returned str>, <Future at 0x9f7bf60 state=finished returned str>} """ print(fs.not_done) """ set() """ for f in fs.done: print(f.result()) """ start sleep 5 sleep 2 sleep 4 end """
第三種:
# 使用上下文管理 with ThreadPoolExecutor() as executor: future1 = executor.submit(task, 5) future2 = executor.submit(task, 2) future3 = executor.submit(task, 4) # 所有函數執行完畢(with語句結束)後才會往下執行
第四種:
executor = ThreadPoolExecutor() future1 = executor.submit(task, 5) future2 = executor.submit(task, 2) future3 = executor.submit(task, 4) # 所有函數執行結束後,才會往下執行 executor.shutdown()
如果我們需要啟動多執行緒來執行函數的話,那麼不妨使用執行緒池。每呼叫一個函數就從池子裡面取出一個執行緒,函數執行完畢就將執行緒放回到池子裡以便其它函數執行。如果池子裡面空了,或者說無法建立新的空閒執行緒,那麼接下來的函數就只能處於等待狀態了。
最後,concurrent.futures 不僅可以用於實現執行緒池,還可以用於實現程序池。兩者的 API 是一樣的:
from concurrent.futures import ProcessPoolExecutor import time def task(n): time.sleep(n) return f"sleep {n}" executor = ProcessPoolExecutor() # Windows 上需要加上這一行 if __name__ == '__main__': future1 = executor.submit(task, 5) future2 = executor.submit(task, 2) future3 = executor.submit(task, 4) executor.shutdown() print(future1.result()) print(future2.result()) print(future3.result()) """ sleep 5 sleep 2 sleep 4 """
執行緒池和程序池的 API 是一致的,但工作中很少會建立程序池。
到此這篇關於Python執行緒池的實現淺析的文章就介紹到這了,更多相關Python執行緒池內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45