<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
multiprocess不是一個模組而是python中一個操作、管理程序的包。
子模組分為四個部分:
process模組是一個建立程序的模組,藉助這個模組,就可以完成程序的建立。
在windows中使用process模組的注意事項
在Windows作業系統中由於沒有fork(linux作業系統中建立程序的機制),在建立子程序的時候會自動 import 啟動它的這個檔案,而在 import 的時候又執行了整個檔案。
因此如果將process()直接寫在檔案中就會無限遞迴建立子程序報錯。所以必須把建立子程序的部分使用if __name__ =='__main__'
判斷保護起來,import 的時候,就不會遞迴執行了。
在一個python程序中開啟子程序,start方法和並行效果。
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) print('我是子程序') if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() # p.join() print('我是父程序')
import os from multiprocessing import Process def f(x): print('子程序id :',os.getpid(),'父程序id :',os.getppid()) return x*x if __name__ == '__main__': print('主程序id :', os.getpid()) p_lst = [] for i in range(5): p = Process(target=f, args=(i,)) p.start()
注意,子程序的執行順序不是根據啟動順序決定的。
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=('bob',)) p.start() p_lst.append(p) p.join() # [p.join() for p in p_lst] print('父程序在執行')
import os from multiprocessing import Process class MyProcess(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print(os.getpid()) print('%s 正在和女主播聊天' %self.name) p1=MyProcess('wupeiqi') p2=MyProcess('yuanhao') p3=MyProcess('nezha') p1.start() # start會自動呼叫run p2.start() # p2.run() p3.start() p1.join() p2.join() p3.join() print('主執行緒')
from multiprocessing import Process def work(): global n n=0 print('子程序內: ',n) if __name__ == '__main__': n = 100 p=Process(target=work) p.start() print('主程序內: ',n)
會隨著主程序的結束而結束。
主程序建立守護行程
其一:守護行程會在主程序程式碼執行結束後就終止
其二:守護行程內無法再開啟子程序,否則丟擲異常:AssertionError: daemonic processes are not allowed to have children
注意:程序之間是互相獨立的,主程序程式碼執行結束,守護行程隨即終止。
import os import time from multiprocessing import Process class Myprocess(Process): def __init__(self,person): super().__init__() self.person = person def run(self): print(os.getpid(),self.name) print('%s正在和女主播聊天' %self.person) p=Myprocess('哪吒') p.daemon=True # 一定要在p.start()前設定,設定p為守護行程,禁止p建立子程序,並且父程序程式碼執行結束,p即終止執行 p.start() time.sleep(10) # 在sleep時檢視程序id對應的程序ps -ef|grep id print('主')
from multiprocessing import Process def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() time.sleep(0.1) print("main-------") # 列印該行則主程序程式碼結束,則守護行程p1應該被終止.#可能會有p1任務執行的列印資訊123,因為主程序列印main----時,p1也執行了,但是隨即被終止,p2可以列印出來。
from socket import * from multiprocessing import Process server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': # windows下start程序一定要寫到這下面 while True: conn,client_addr=server.accept() p=Process(target=talk,args=(conn,client_addr))
from multiprocessing import Process import time import random class Myprocess(Process): def __init__(self,person): self.name=person super().__init__() def run(self): print('%s正在和網紅臉聊天' %self.name) time.sleep(random.randrange(1,5)) print('%s還在和網紅臉聊天' %self.name) p1=Myprocess('哪吒') p1.start() p1.terminate()#關閉程序,不會立即關閉,所以is_alive立刻檢視的結果可能還是存活 print(p1.is_alive()) #結果為True print('開始') print(p1.is_alive()) #結果為False
class Myprocess(Process): def __init__(self,person): self.name=person # name屬性是Process中的屬性,標示程序的名字 super().__init__() # 執行父類別的初始化方法會覆蓋name屬性 # self.name = person # 在這裡設定就可以修改程序名字了 # self.person = person # 如果不想覆蓋程序名,就修改屬性名稱就可以了 def run(self): print('%s正在和網紅臉聊天' %self.name) # print('%s正在和網紅臉聊天' %self.person) time.sleep(random.randrange(1,5)) print('%s正在和網紅臉聊天' %self.name) # print('%s正在和網紅臉聊天' %self.person) p1=Myprocess('哪吒') p1.start() print(p1.pid) #可以檢視子程序的程序id
Process([group [, target [, name [, args [, kwargs]]]]]),由該類範例化得到的物件,表示一個子程序中的任務(尚未啟動)
強調:
1.需要使用關鍵字的方式來指定引數
2.args指定的為傳給target函數的位置引數,是一個元組形式,必須有逗號
引數介紹:
•group引數未使用,值始終為None
•target表示呼叫物件,即子程序要執行的任務
•args表示呼叫物件的位置引數元組,args=(1,2,'egon',)
•kwargs表示呼叫物件的字典,kwargs={'name':'egon','age':18}
•name為子程序的名稱
1、 方法介紹
•p.start():啟動程序,並呼叫該子程序中的p.run()
•p.run():程序啟動時執行的方法,正是它去呼叫target指定的函數,我們自定義類的類中一定要實現該方法
•p.terminate():強制終止程序p,不會進行任何清理操作,如果p建立了子程序,該子程序就成了殭屍程序,使用該方法需要特別小心這種情況。如果p還儲存了一個鎖那麼也將不會被釋放,進而導致死鎖
•p.is_alive():如果p仍然執行,返回True
•p.join([timeout]):主執行緒等待p終止(強調:是主執行緒處於等的狀態,而p是處於執行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的程序,而不能join住run開啟的程序
2、 屬性介紹
•p.daemon:預設值為False,如果設為True,代表p為後臺執行的守護行程,當p的父程序終止時,p也隨之終止,並且設定為True後,p不能建立自己的新程序,必須在p.start()之前設定
•p.name:程序的名稱
•p.pid:程序的pid
•p.exitcode:程序在執行時為None、如果為–N,表示被訊號N結束(瞭解即可)
•p.authkey:程序的身份驗證鍵,預設是由os.urandom()隨機生成的32字元的字串。這個鍵的用途是為涉及網路連線的底層程序間通訊提供安全性,這類連線只有在具有相同的身份驗證鍵時才能成功(瞭解即可)
當多個程序使用同一份資料資源的時候,就會引發資料安全或順序混亂問題。
import os import time import random from multiprocessing import Process def work(n): print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s:%s is done' % (n, os.getpid())) if __name__ == '__main__': for i in range(3): p = Process(target=work, args=(i,)) p.start() # 0: 15620 is running # 1: 19688 is running # 2: 15892 is running # 1:19688 is done # 0:15620 is done # 2:15892 is done
由並行變成了序列,犧牲了執行效率,但避免了競爭,確實會浪費了時間,卻保證了資料的安全。
import os import time import random from multiprocessing import Process,Lock def work(lock,n): lock.acquire() print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s: %s is done' % (n, os.getpid())) lock.release() if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=work,args=(lock,i)) p.start() # 1: 24776 is running # 1: 24776 is done # 0: 23588 is running # 0: 23588 is done # 2: 27308 is running # 2: 27308 is done
# 檔案db的內容為:{"count":5} # 注意一定要用雙引號,不然json無法識別 # 並行執行,效率高,但競爭寫同一檔案,資料寫入錯亂 from multiprocessing import Process, Lock import time, json, random def search(): dic = json.load(open('db')) print('剩餘票數%s' % dic['count']) def get(): dic = json.load(open('db')) time.sleep(random.random()) # 模擬讀資料的網路延遲 if dic['count'] > 0: dic['count'] -= 1 time.sleep(random.random()) # 模擬寫資料的網路延遲 json.dump(dic, open('db', 'w')) print('購票成功') else: print('購票失敗') def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': lock = Lock() for i in range(10): # 模擬並行10個使用者端搶票 p = Process(target=task, args=(lock,)) p.start()
雖然可以用檔案共用資料實現程序間通訊,但問題是:
因此我們最好找尋一種解決方案能夠兼顧:
佇列和管道都是將資料存放於記憶體中,佇列又是基於(管道+鎖)實現的,可以讓我們從複雜的鎖問題中解脫出來,我們應該儘量避免使用共用資料,儘可能使用訊息傳遞和佇列,避免處理複雜的同步和鎖問題,而且在程序數目增多時,往往可以獲得更好的可獲展性。
建立共用的程序佇列,Queue是多程序安全的佇列,可以使用Queue實現多程序之間的資料傳遞。
Queue([maxsize])
建立共用的程序佇列。
引數 :maxsize是佇列中允許的最大項數。如果省略此引數,則無大小限制。
底層佇列使用管道和鎖定實現。另外,還需要執行支援執行緒以便佇列中的資料傳輸到底層管道中。
q.get( [ block [ ,timeout ] ] )
:返回q中的一個專案。如果q為空,此方法將阻塞,直到佇列中有專案可用為止。block用於控制阻塞行為,預設為True. 如果設定為False,將引發Queue.Empty異常(定義在Queue模組中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有專案變為可用,將引發Queue.Empty異常。q.get_nowait()
:同q.get(False)
方法。q.put(item [, block [,timeout ] ] )
:將item放入佇列。如果佇列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,預設為True。如果設定為False,將引發Queue.Empty異常(定義在Queue庫模組中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發Queue.Full異常。q.qsize()
:返回佇列中目前專案的正確數量。此函數的結果並不可靠,因為在返回結果和在稍後程式中使用結果之間,佇列中可能新增或刪除了專案。在某些系統上,此方法可能引發NotImplementedError異常。q.empty()
:如果呼叫此方法時 q為空,返回True。如果其他程序或執行緒正在往佇列中新增專案,結果是不可靠的。也就是說,在返回和使用結果之間,佇列中可能已經加入新的專案。q.full()
:如果q已滿,返回為True. 由於執行緒的存在,結果也可能是不可靠的(參考q.empty()
方法)。q.close()
:關閉佇列,防止佇列中加入更多資料。呼叫此方法時,後臺執行緒將繼續寫入那些已入佇列但尚未寫入的資料,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動呼叫此方法。關閉佇列不會在佇列使用者中生成任何型別的資料結束訊號或異常。例如,如果某個使用者正被阻塞在get()
操作上,關閉生產者中的佇列不會導致get()
方法返回錯誤。q.cancel_join_thread()
:不會再程序退出時自動連線後臺執行緒。這可以防止join_thread()
方法阻塞。q.join_thread()
:連線佇列的後臺執行緒。此方法用於在呼叫q.close()
方法後,等待所有佇列項被消耗。預設情況下,此方法由不是q的原始建立者的所有程序呼叫。呼叫q.cancel_join_thread()
方法可以禁止這種行為。這個例子還沒有加入程序通訊,只是先來看看佇列為我們提供的方法,以及這些方法的使用和現象。
''' multiprocessing模組支援程序間通訊的兩種主要形式:管道和佇列 都是基於訊息傳遞實現的,但是佇列介面 ''' from multiprocessing import Queue q = Queue(3) # put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(2) q.put(1) # q.put(3) # 如果佇列已經滿了,程式就會停在這裡,等待資料被別人取走,再將資料放入佇列。 # 如果佇列中的資料一直不被取走,程式就會永遠停在這裡。 try: q.put_nowait(3) # 可以使用put_nowait,如果佇列滿了不會阻塞,但是會因為佇列滿了而報錯。 except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程式不會一直阻塞下去,但是會丟掉這個訊息。 print('佇列已經滿了') # 因此,我們再放入資料之前,可以先看一下佇列的狀態,如果已經滿了,就不繼續put了。 print(q.full()) # True print(q.get()) # 3 print(q.get()) # 2 print(q.get()) # 1 # print(q.get()) # 同put方法一樣,如果佇列已經空了,那麼繼續取就會出現阻塞。 try: q.get_nowait(3) # 可以使用get_nowait,如果佇列滿了不會阻塞,但是會因為沒取到值而報錯。 except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程式不會一直阻塞下去。 print('佇列已經空了') print(q.empty()) # True,空了
一個queue的簡單應用,使用佇列q物件呼叫get函數來取得佇列中最先進入的資料。
import time from multiprocessing import Process, Queue def f(q): q.put([time.asctime(), 'from Eva', 'hello']) # 呼叫主函數中p程序傳遞過來的程序引數 put函數為向佇列中新增一條資料。 if __name__ == '__main__': q = Queue() # 建立一個Queue物件 p = Process(target=f, args=(q,)) # 建立一個程序 p.start() print(q.get()) p.join() # ['Mon Dec 9 18:27:08 2019', 'from Eva', 'hello']
import os import time import multiprocessing # 向queue中輸入資料的函數 def inputQ(queue): info = str(os.getpid()) + '(put):' + str(time.asctime()) queue.put(info) # 向queue中輸出資料的函數 def outputQ(queue): info= queue.get() print('%s%s%s' % (str(os.getpid()), '(get):', info)) # Main if __name__ == '__main__': multiprocessing.freeze_support() record1 = [] # store input processes record2 = [] # store output processes queue = multiprocessing.Queue(3) # 輸入程序 for i in range(10): process = multiprocessing.Process(target=inputQ, args=(queue,)) process.start() record1.append(process) # 輸出程序 for i in range(10): process = multiprocessing.Process(target=outputQ, args=(queue,)) process.start() record2.append(process) for p in record1: p.join() for p in record2: p.join() # 17568(get):3208(put):Mon Dec 9 18:29:17 2019 # 27620(get):24024(put):Mon Dec 9 18:29:17 2019 # 19780(get):21716(put):Mon Dec 9 18:29:17 2019 # 27576(get):27608(put):Mon Dec 9 18:29:17 2019 # 11304(get):10668(put):Mon Dec 9 18:29:18 2019 # 19732(get):20548(put):Mon Dec 9 18:29:18 2019 # 18120(get):25360(put):Mon Dec 9 18:29:18 2019 # 24752(get):21764(put):Mon Dec 9 18:29:18 2019 # 19848(get):7604(put):Mon Dec 9 18:29:18 2019 # 13888(get):10376(put):Mon Dec 9 18:29:18 2019
在並行程式設計中使用生產者和消費者模式能夠解決絕大多數並行問題。該模式通過平衡生產執行緒和消費執行緒的工作能力來提高程式的整體處理資料的速度。
線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。
import os import random import time from multiprocessing import Process, Queue def producer(q): for i in range(10): time.sleep(random.randint(1, 3)) res = '包子%s' % i q.put(res) print('%s 生產了 %s' % (os.getpid(), res)) def consumer(q): while True: res = q.get() time.sleep(random.randint(1, 3)) print('%s 吃 %s' % (os.getpid(), res)) if __name__ == '__main__': q = Queue() # 生產者們:即廚師們 p1 = Process(target=producer, args=(q,)) # 消費者們:即吃貨們 c1 = Process(target=consumer, args=(q,)) # 開始 p1.start() c1.start() print('主')
此時的問題是主程序永遠不會結束,原因是:生產者p在生產完後就結束了,但是消費者c在取空了q之後,則一直處於死迴圈中且卡在q.get()這一步。
解決方式無非是讓生產者在生產完畢後,往佇列中再發一個結束訊號,這樣消費者在接收到結束訊號後就可以break出死迴圈。
注意:結束訊號None,不一定要由生產者發,主程序裡同樣可以發,但主程序需要等生產者結束後才應該傳送該訊號。
from multiprocessing import Process, Queue import time, random, os def producer(q): for i in range(10): time.sleep(random.randint(1, 3)) res = '包子%s' % i q.put(res) print('%s 生產了 %s' % (os.getpid(), res)) q.put(None)# 傳送結束訊號 def consumer(q): while True: res = q.get() if res is None: break # 收到結束訊號則結束 time.sleep(random.randint(1, 3)) print('%s 吃 %s' % (os.getpid(), res)) if __name__ == '__main__': q = Queue() # 生產者們:即廚師們 p1 = Process(target=producer, args=(q,)) # 消費者們:即吃貨們 c1 = Process(target=consumer, args=(q,)) # 開始 p1.start() c1.start() print('主')
from multiprocessing import Process, Queue import time, random, os def producer(q): for i in range(2): time.sleep(random.randint(1, 3)) res = '包子%s' % i q.put(res) print('%s 生產了 %s' % (os.getpid(), res)) def consumer(q): while True: res = q.get() if res is None: break # 收到結束訊號則結束 time.sleep(random.randint(1, 3)) print('%s 吃 %s' % (os.getpid(), res)) if __name__ == '__main__': q = Queue() # 生產者們:即廚師們 p1 = Process(target=producer, args=(q,)) # 消費者們:即吃貨們 c1 = Process(target=consumer, args=(q,)) # 開始 p1.start() c1.start() p1.join() q.put(None)# 傳送結束訊號 print('主')
但上述解決方式,在有多個生產者和多個消費者時,我們則需要用一個很low的方式去解決
from multiprocessing import Process, Queue import time, random, os def producer(name, q): for i in range(2): time.sleep(random.randint(1, 3)) res = '%s%s' % (name, i) q.put(res) print('%s 生產了 %s' % (os.getpid(), res)) def consumer(q): while True: res = q.get() if res is None: break # 收到結束訊號則結束 time.sleep(random.randint(1, 3)) print('%s 吃 %s' % (os.getpid(), res)) if __name__ == '__main__': q = Queue() # 生產者們:即廚師們 p1 = Process(target=producer, args=('包子', q)) p2 = Process(target=producer, args=('骨頭', q)) p3 = Process(target=producer, args=('泔水', q)) # 消費者們:即吃貨們 c1 = Process(target=consumer, args=(q,)) c2 = Process(target=consumer, args=(q,)) # 開始 p1.start() p2.start() p3.start() c1.start() p1.join() # 必須保證生產者全部生產完畢,才應該傳送結束訊號 p2.join() p3.join() q.put(None) # 有幾個消費者就應該傳送幾次結束訊號None q.put(None) # 傳送結束訊號 print('主')
建立可連線的共用程序佇列。這就像是一個Queue物件,但佇列允許專案的使用者通知生產者專案已經被成功處理。通知程序是使用共用的訊號和條件變數來實現的。
JoinableQueue的範例p除了與Queue物件相同的方法之外,還具有以下方法:
q.task_done()
:使用者使用此方法發出訊號,表示q.get()返回的專案已經被處理。如果呼叫此方法的次數大於從佇列中刪除的專案數量,將引發ValueError異常。q.join()
:生產者將使用此方法進行阻塞,直到佇列中所有專案均被處理。阻塞將持續到為佇列中的每個專案均呼叫q.task_done()方法為止。from multiprocessing import Process, JoinableQueue import time, random, os def producer(name, q): for i in range(10): time.sleep(random.randint(1, 3)) res = '%s%s' % (name, i) q.put(res) print('%s 生產了 %s' % (os.getpid(), res)) q.join() # 生產完畢,使用此方法進行阻塞,直到佇列中所有專案均被處理。 def consumer(q): while True: res = q.get() time.sleep(random.randint(1, 3)) print('%s 吃 %s' % (os.getpid(), res)) q.task_done() # 向q.join()傳送一次訊號,證明一個資料已經被取走了 if __name__ == '__main__': q = JoinableQueue() # 生產者們:即廚師們 p1 = Process(target=producer, args=('包子', q)) p2 = Process(target=producer, args=('骨頭', q)) p3 = Process(target=producer, args=('泔水', q)) # 消費者們:即吃貨們 c1 = Process(target=consumer, args=(q,)) c2 = Process(target=consumer, args=(q,)) c1.daemon = True c2.daemon = True # 開始 p_l = [p1, p2, p3, c1, c2] for p in p_l: p.start() p1.join() p2.join() p3.join() print('主') # 主程序等--->p1,p2,p3等---->c1,c2 # p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到佇列的資料 # 因而c1,c2也沒有存在的價值了,不需要繼續阻塞在程序中影響主程序了。應該隨著主程序的結束而結束,所以設定成守護行程就可以了。
Pool([numprocess [,initializer [, initargs]]])
:建立程序池
cpu_count()
的值p.apply(func [, args [, kwargs]])
:在一個池工作程序中執行func(*args,**kwargs),然後返回結果。需要強調的是:此操作並不會在所有池工作程序中並執行func函數。如果要通過不同引數並行地執行func函數,必須從不同執行緒呼叫p.apply()
函數或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]])
:在一個池工作程序中執行func(*args,**kwargs),然後返回結果。此方法的結果是AsyncResult類的範例,callback是可呼叫物件,接收輸入引數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。p.close()
:關閉程序池,防止進一步操作。如果所有操作持續掛起,它們將在工作程序終止前完成P.join()
:等待所有工作程序退出。此方法只能在close()
或teminate()
之後呼叫obj.get()
:返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠端操作中引發了異常,它將在呼叫此方法時再次被引發。obj.ready()
:如果呼叫完成,返回Trueobj.successful()
:如果呼叫完成且沒有引發異常,返回True,如果在結果就緒之前呼叫此方法,引發異常obj.wait([timeout])
:等待結果變為可用。obj.terminate()
:立即終止所有工作程序,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動呼叫此函數。import os,time from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2 if __name__ == '__main__': p=Pool(3) #程序池中從無到有建立三個程序,以後一直是這三個程序在執行任務 res_l=[] for i in range(10): res=p.apply(work,args=(i,)) # 同步呼叫,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞 # 但不管該任務是否存在阻塞,同步呼叫都會在原地等著 print(res_l)
import os import time import random from multiprocessing import Pool def work(n): print('%s run' % os.getpid()) time.sleep(random.random()) return n ** 2 if __name__ == '__main__': p = Pool(3) # 程序池中從無到有建立三個程序,以後一直是這三個程序在執行任務 res_l = [] for i in range(10): res= p.apply_async(work, args=(i,)) # 非同步執行,根據程序池中有的程序數,每次最多3個子程序在非同步執行 # 返回結果之後,將結果放入列表,歸還程序,之後再執行新的任務 # 需要注意的是,程序池中的三個程序不會同時開啟或者同時結束 # 而是執行完一個就釋放一個程序,這個程序就去接收新的任務。 res_l.append(res) # 非同步apply_async用法:如果使用非同步提交的任務,主程序需要使用join,等待程序池內任務都處理完,然後可以用get收集結果 # 否則,主程序結束,程序池可能還沒來得及執行,也就跟著一起結束了 p.close() p.join() for res in res_l: print(res.get() ) # 使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get
#Pool內的程序數預設是cpu核數,假設為4(檢視方法os.cpu_count()) #開啟6個使用者端,會發現2個使用者端處於等待狀態 #在每個程序內檢視pid,會發現pid使用為4個,即多個使用者端公用4個程序 from socket import * from multiprocessing import Pool import os server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn): print('程序pid: %s' %os.getpid()) while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': p=Pool(4) while True: conn,*_=server.accept() p.apply_async(talk,args=(conn,)) # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個使用者端能存取
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
發現:並行開啟多個使用者端,伺服器端同一時間只有4個不同的pid,只能結束一個使用者端,另外一個使用者端才會進來。
需要回撥函數的場景:程序池中任何一個任務一旦處理完了,就立即告知主程序:我好了額,你可以處理我的結果了。主程序則呼叫一個函數去處理該結果,該函數即回撥函數
我們可以把耗時間(阻塞)的任務放到程序池中,然後指定回撥函數(主程序負責執行),這樣主程序在執行回撥函數時就省去了I/O的過程,直接拿到的是任務的結果。
from multiprocessing import Pool import requests import json import os def get_page(url): print('<程序%s> get %s' % (os.getpid(), url)) respone = requests.get(url) if respone.status_code == 200: return {'url': url, 'text': respone.text} def pasrse_page(res): print('<程序%s> parse %s' % (os.getpid(), res['url'])) parse_res = 'url:<%s> size:[%s]n' % (res['url'], len(res['text'])) with open('db.txt', 'a') as f: f.write(parse_res) if __name__ == '__main__': urls = [ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p = Pool(3) res_l = [] for url in urls: res =p.apply_async(get_page, args=(url,), callback=pasrse_page) res_l.append(res) p.close() p.join() print([res.get() for res in res_l]) # 拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經傳給回撥函數處理了 ''' 列印結果: <程序3388> get https://www.baidu.com <程序3389> get https://www.python.org <程序3390> get https://www.openstack.org <程序3388> get https://help.github.com/ <程序3387> parse https://www.baidu.com <程序3389> get http://www.sina.com.cn/ <程序3387> parse https://www.python.org <程序3387> parse https://help.github.com/ <程序3387> parse http://www.sina.com.cn/ <程序3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>rn...',...}] '''
import re from urllib.request import urlopen from multiprocessing import Pool def get_page(url,pattern): response=urlopen(url).read().decode('utf-8') return pattern,response def parse_page(info): pattern,page_content=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0].strip(), 'title':item[1].strip(), 'actor':item[2].strip(), 'time':item[3].strip(), } print(dic) if __name__ == '__main__': regex = r'<dd>.*?<.*?class="board-index.*?>(d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>' pattern1=re.compile(regex,re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern), callback=parse_page) res_l.append(res) for i in res_l: i.get()
如果在主程序中等待程序池中所有任務都執行完畢後,再統一處理結果,則無需回撥函數。
from multiprocessing import Pool import time,random,os def work(n): time.sleep(1) return n**2 if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待程序池中所有程序執行完畢 nums=[] for res in res_l: nums.append(res.get() ) #拿到所有結果 print(nums) #主程序拿到所有的處理結果,可以在主程序中進行統一進行處理
程序池的其他實現方法:https://docs.python.org/dev/library/concurrent.futures.html
到此這篇關於Python程序操作模組的文章就介紹到這了。希望對大家的學習有所幫助,也希望大家多多支援it145.com。
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45