<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
所以執行緒池的思想就是:每個執行緒各自分配一個任務,剩下的任務排隊等待,當某個執行緒完成了任務的時候,排隊任務就可以安排給這個執行緒繼續執行。
它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,
分別實現了對threading模組和multiprocessing模組的進一步抽象。
不僅可以幫我們自動排程執行緒,還可以做到:
# -*-coding:utf-8 -*- from concurrent.futures import ThreadPoolExecutor import time # 引數times用來模擬網路請求時間 def get_html(times): print("get page {}s finished".format(times)) return times # 建立執行緒池 # 設定執行緒池中最多能同時執行的執行緒數目,其他等待 executor = ThreadPoolExecutor(max_workers=2) # 通過submit函數提交執行的函數到執行緒池中,submit函數立即返回,不阻塞 # task1和task2是任務控制程式碼 task1 = executor.submit( get_html, (2) ) task2 = executor.submit( get_html, (3) ) # done()方法用於判斷某個任務是否完成,bool型,完成返回True,沒有完成返回False print( task1.done() ) # cancel()方法用於取消某個任務,該任務沒有放到執行緒池中才能被取消,如果已經放進執行緒池子中,則不能被取消 # bool型,成功取消了返回True,沒有取消返回False print( task2.cancel() ) # result()方法可以獲取task的執行結果,前提是get_html()函數有返回值 print( task1.result() ) print( task2.result() ) # 結果: # get page 3s finished # get page 2s finished # True # False # 2 # 3
ThreadPoolExecutor類在構造範例的時候,傳入max_workers引數來設定執行緒池中最多能同時執行的執行緒數目
使用submit()函數來提交執行緒需要執行任務(函數名和引數)到執行緒池中,並返回該任務的控制程式碼,
注意:submit()不是阻塞的,而是立即返回。
通過submit()函數返回的任務控制程式碼,能夠使用done()方法判斷該任務是否結束,使用cancel()方法來取消,使用result()方法可以獲取任務的返回值,檢視內部程式碼,發現該方法是阻塞的
上面雖然提供了判斷任務是否結束的方法,但是不能在主執行緒中一直判斷,有時候我們是得知某個任務結束了,就去獲取結果,而不是一直判斷每個任務有沒有結束。這時候就可以使用as_completed方法一次取出所有任務的結果。
# -*-coding:utf-8 -*- from concurrent.futures import ThreadPoolExecutor, as_completed import time # 引數times用來模擬網路請求時間 def get_html(times): time.sleep(times) print("get page {}s finished".format(times)) return times # 建立執行緒池子 # 設定最多2個執行緒執行,其他等待 executor = ThreadPoolExecutor(max_workers=2) urls = [3,2,4] # 一次性把所有的任務都放進執行緒池,得到一個控制程式碼,但是最多隻能同時執行2個任務 all_task = [ executor.submit(get_html,(each_url)) for each_url in urls ] for future in as_completed( all_task ): data = future.result() print("in main:get page {}s success".format(data)) # 結果 # get page 2s finished # in main:get page 2s success # get page 3s finished # in main:get page 3s success # get page 4s finished # in main:get page 4s success # 從結果可以看到,並不是先傳入哪個url,就先執行哪個url,沒有先後順序
除了上面的as_completed()方法,還可以使用execumap方法。但是有一點不同,使用map方法,不需提前使用submit方法,
map方法與python標準庫中的map含義相同,都是將序列中的每個元素都執行同一個函數。上面的程式碼就是對urls列表中的每個元素都執行get_html()函數,並分配各執行緒池。可以看到執行結果與上面的as_completed方法的結果不同,輸出順序和urls列表的順序相同,就算2s的任務先執行完成,也會先列印出3s的任務先完成,再列印2s的任務完成
# -*-coding:utf-8 -*- from concurrent.futures import ThreadPoolExecutor,as_completed import time # 引數times用來模擬網路請求時間 def get_html(times): time.sleep(times) print("get page {}s finished".format(times)) return times # 建立執行緒池子 # 設定最多2個執行緒執行,其他等待 executor = ThreadPoolExecutor(max_workers=2) urls = [3,2,4] for result in executor.map(get_html, urls): print("in main:get page {}s success".format(result))
結果:
get page 2s finished
get page 3s finished
in main:get page 3s success
in main:get page 2s success
get page 4s finished
in main:get page 4s success
wait方法可以讓主執行緒阻塞,直到滿足設定的要求。wait方法接收3個引數,等待的任務序列、超時時間以及等待條件。
等待條件return_when預設為ALL_COMPLETED,表明要等待所有的任務都借宿。可以看到執行結果中,確實是所有任務都完成了,主執行緒才列印出main,等待條件還可以設定為FIRST_COMPLETED,表示第一個任務完成就停止等待。
超時時間引數可以不設定:
wait()方法和as_completed(), map()沒有關係。不管你是用as_completed(),還是用map()方法,你都可以在執行主執行緒之前使用wait()。
as_completed()和map()是二選一的。
# -*-coding:utf-8 -*- from concurrent.futures import ThreadPoolExecutor,wait,ALL_COMPLETED,FIRST_COMPLETED import time # 引數times用來模擬網路請求時間 def get_html(times): time.sleep(times) print("get page {}s finished".format(times)) return times # 建立執行緒池子 # 設定最多2個執行緒執行,其他等待 executor = ThreadPoolExecutor(max_workers=2) urls = [3,2,4] all_task = [executor.submit(get_html,(url)) for url in urls] wait(all_task,return_when=ALL_COMPLETED) print("main") # 結果 # get page 2s finished # get page 3s finished # get page 4s finished # main
import datetime from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import current_thread import time, random, os import requests def task(name): print('%s %s is running'%(name,os.getpid())) #print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) if __name__ == '__main__': p = ProcessPoolExecutor(4) # 設定 for i in range(10): # 同步呼叫方式,不僅要呼叫,還要等返回值 obj = p.submit(task, "程序pid:") # 傳參方式(任務名,引數),引數使用位置或者關鍵字引數 res = obj.result() p.shutdown(wait=True) # 關閉程序池的入口,等待池內任務執行結束 print("主") ################ ################ # 另一個同步呼叫的demo def get(url): print('%s GET %s' % (os.getpid(),url)) time.sleep(3) response = requests.get(url) if response.status_code == 200: res = response.text else: res = "下載失敗" return res # 有返回值 def parse(res): time.sleep(1) print("%s 解析結果為%s" %(os.getpid(),len(res))) if __name__ == "__main__": urls = [ 'https://www.baidu.com', 'https://www.sina.com.cn', 'https://www.tmall.com', 'https://www.jd.com', 'https://www.python.org', 'https://www.openstack.org', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', ] p=ProcessPoolExecutor(9) l=[] start = time.time() for url in urls: future = p.submit(get,url) # 需要等結果,所以是同步呼叫 l.append(future) # 關閉程序池,等所有的程序執行完畢 p.shutdown(wait=True) for future in l: parse(future.result()) print('完成時間:',time.time()-start) #完成時間: 13.209137678146362
def task(name): print("%s %s is running" %(name,os.getpid())) time.sleep(random.randint(1,3)) if __name__ == '__main__': p = ProcessPoolExecutor(4) # 設定程序池內程序 for i in range(10): # 非同步呼叫方式,只呼叫,不等返回值 p.submit(task,'程序pid:') # 傳參方式(任務名,引數),引數使用位置引數或者關鍵字引數 p.shutdown(wait=True) # 關閉程序池的入口,等待池內任務執行結束 print('主') ################## ################## # 另一個非同步呼叫的demo def get(url): print('%s GET %s' % (os.getpid(),url)) time.sleep(3) reponse = requests.get(url) if reponse.status_code == 200: res = reponse.text else: res = "下載失敗" parse(res) # 沒有返回值 def parse(res): time.sleep(1) print('%s 解析結果為%s' %(os.getpid(),len(res))) if __name__ == '__main__': urls = [ 'https://www.baidu.com', 'https://www.sina.com.cn', 'https://www.tmall.com', 'https://www.jd.com', 'https://www.python.org', 'https://www.openstack.org', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', ] p = ProcessPoolExecutor(9) start = time.time() for url in urls: future = p.submit(get,url) p.shutdown(wait=True) print("完成時間",time.time()-start)# 完成時間 6.293345212936401
(1)程序池:非同步 + 回撥函數,,cpu密集型,同時執行,每個程序有不同的直譯器和記憶體空間,互不干擾
def get(url): print('%s GET %s' % (os.getpid(), url)) time.sleep(3) response = requests.get(url) if response.status_code == 200: res = response.text else: res = '下載失敗' return res def parse(future): time.sleep(1) # 傳入的是個物件,獲取返回值 需要進行result操作 res = future.result() print("res",) print('%s 解析結果為%s' % (os.getpid(), len(res))) if __name__ == '__main__': urls = [ 'https://www.baidu.com', 'https://www.sina.com.cn', 'https://www.tmall.com', 'https://www.jd.com', 'https://www.python.org', 'https://www.openstack.org', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', ] p = ProcessPoolExecutor(9) start = time.time() for url in urls: future = p.submit(get,url) #模組內的回撥函數方法,parse會使用future物件的返回值,物件返回值是執行任務的返回值 #回撥應該是相當於parse(future) future.add_done_callback(parse) p.shutdown(wait=True) print("完成時間",time.time()-start)#完成時間 33.79998469352722
(2)執行緒池:非同步 + 回撥函數,IO密集型主要使用方式,執行緒池:執行操作為誰有空誰執行
def get(url): print("%s GET %s" %(current_thread().name,url)) time.sleep(3) reponse = requests.get(url) if reponse.status_code == 200: res = reponse.text else: res = "下載失敗" return res def parse(future): time.sleep(1) res = future.result() print("%s 解析結果為%s" %(current_thread().name,len(res))) if __name__ == '__main__': urls = [ 'https://www.baidu.com', 'https://www.sina.com.cn', 'https://www.tmall.com', 'https://www.jd.com', 'https://www.python.org', 'https://www.openstack.org', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', ] p = ThreadPoolExecutor(4) start = time.time() for url in urls: future = p.submit(get,url) future.add_done_callback(parse) p.shutdown(wait=True) print("主",current_thread().name) print("完成時間",time.time()-start)#完成時間 32.52604126930237
到此這篇關於python中ThreadPoolExecutor執行緒池和ProcessPoolExecutor程序池的文章就介紹到這了,更多相關python ThreadPoolExecutor 內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45