首頁 > 軟體

python 多執行緒實現多工的方法範例

2021-07-22 10:00:09

1 多執行緒實現多工

1.1 什麼是執行緒?

        程序是作業系統分配程式執行資源的單位,而執行緒是程序的一個實體,是CPU排程和分配的單位。一個程序肯定有一個主執行緒,我們可以在一個程序裡建立多個執行緒來實現多工。

1.2 一個程式實現多工的方法

實現多工,我們可以用幾種方法。

(1)在主程序裡面開啟多個子程序,主程序和多個子程序一起處理任務。

(2)在主程序裡開啟多個子執行緒,主執行緒和多個子執行緒一起處理任務。

(3)在主程序裡開啟多個協程,多個協程一起處理任務。

        注意:因為用多個執行緒一起處理任務,會產生執行緒安全問題,所以在開發中一般使用多程序+多協程來實現多工。

1.3 多執行緒的建立方式

1.3.1 建立threading.Thread物件

import threading
p1 = threading.Thread(target=[函數名],args=([要傳入函數的引數]))
p1.start()  # 啟動p1執行緒

        我們來模擬一下多執行緒實現多工。

        假如你在用網易雲音樂一邊聽歌一邊下載。網易雲音樂就是一個程序。假設網易雲音樂內部程式是用多執行緒來實現多工的,網易雲音樂開兩個子執行緒。一個用來快取音樂,用於現在的播放。一個用來下載使用者要下載的音樂的。這時候的程式碼框架是這樣的:

import threading
import time
 
def listen_music(name):
    while True:
        time.sleep(1)
        print(name,"正在播放音樂")
 
 
def download_music(name):
    while True:
        time.sleep(2)
        print(name,"正在下載音樂")
 
 
if __name__ == '__main__':
    p1 = threading.Thread(target=listen_music,args=("網易雲音樂",))
    p2 = threading.Thread(target=download_music,args=("網易雲音樂",))
    p1.start()
    p2.start()

輸出:

觀察上面的輸出程式碼可以知道:

CPU是按照時間片輪詢的方式來執行子執行緒的。cpu內部會合理分配時間片。時間片到a程式的時候,a程式如果在休眠,就會自動切換到b程式。

嚴謹來說,CPU在某個時間點,只在執行一個任務,但是由於CPU執行速度和切換速度快,因為看起來像多個任務在一起執行而已。

1.3.2 繼承threading.Thread,並重寫run

        除了上面的方法建立執行緒,還有另一種方法。可以編寫一個類,繼承threaing.Thread類,然後重寫父類別的run方法。

import threading
import time
 
class MyThread(threading.Thread):
    def run(self):
        for i in range(5):
            time.sleep(1)
            print(self.name,i)
 
t1 = MyThread()
t2 = MyThread()
t3 = MyThread()
t1.start()
t2.start()
t3.start()

輸出:

        執行時無序的,說明已經啟用了多工。

下面是threading.Thread提供的執行緒物件方法和屬性:

  • start():建立執行緒後通過start啟動執行緒,等待CPU排程,為run函數執行做準備;
  • run():執行緒開始執行的入口函數,函數體中會呼叫使用者編寫的target函數,或者執行被過載的run函數;
  • join([timeout]):阻塞掛起呼叫該函數的執行緒,直到被呼叫執行緒執行完成或超時。通常會在主執行緒中呼叫該方法,等待其他執行緒執行完成。
  • name、getName()&setName():執行緒名稱相關的操作;
  • ident:整數型別的執行緒識別符號,執行緒開始執行前(呼叫start之前)為None;
  • isAlive()、is_alive():start函數執行之後到run函數執行完之前都為True;
  • daemon、isDaemon()&setDaemon():守護執行緒相關;

1.4 執行緒何時開啟,何時結束

(1)子執行緒何時開啟,何時執行 當呼叫thread.start()時 開啟執行緒,再執行執行緒的程式碼

(2)子執行緒何時結束 子執行緒把target指向的函數中的語句執行完畢後,或者執行緒中的run函數程式碼執行完畢後,立即結束當前子執行緒

(3)檢視當前執行緒數量 通過threading.enumerate()可列舉當前執行的所有執行緒

(4)主執行緒何時結束 所有子執行緒執行完畢後,主執行緒才結束

範例一:

import threading
import time
  
def run():
    for i in range(5):
        time.sleep(1)
        print(i)
 
t1 = threading.Thread(target=run)
t1.start()
print("我會在哪裡出現")

輸出:

        為什麼主程序(主執行緒)的程式碼會先出現呢?因為CPU採用時間片輪詢的方式,如果輪詢到子執行緒,發現他要休眠1s,他會先去執行主執行緒。所以說CPU的時間片輪詢方式可以保證CPU的最佳執行。

        那如果我想主程序輸出的那句話執行在結尾呢?該怎麼辦呢?這時候就需要用到 join() 方法了。

1.5 執行緒的 join() 方法

import threading
import time
 
def run():
    for i in range(5):
        time.sleep(1)
        print(i)
 
t1 = threading.Thread(target=run)
t1.start()
t1.join()  
print("我會在哪裡出現")

輸出:

        join() 方法可以阻塞主執行緒(注意只能阻塞主執行緒其他子執行緒是不能阻塞的),直到 t1 子執行緒執行完,再解阻塞。

1.6 多執行緒共用全域性變數出現的問題

        我們開兩個子執行緒,全域性變數是0,我們每個執行緒對他自加1,每個執行緒加一百萬次,這時候就會出現問題了,來,看程式碼:

import threading
import time
 
num = 0
 
def work1(loop):
    global num
    for i in range(loop):
        # 等價於 num += 1
        temp = num
        num = temp + 1
    print(num)
 
 
def work2(loop):
    global num
    for i in range(loop):
        # 等價於 num += 1
        temp = num
        num = temp + 1
    print(num)
 
 
if __name__ == '__main__':
    t1 = threading.Thread(target=work1,args=(1000000,))
    t2 = threading.Thread(target=work2, args=(1000000,))
    t1.start()
    t2.start()
 
    while len(threading.enumerate()) != 1:
        time.sleep(1)
    print(num)

輸出

1459526 # 第一個子執行緒結束後全域性變數一共加到這個數
1588806 # 第二個子執行緒結束後全域性變數一共加到這個數
1588806 # 兩個執行緒都結束後,全域性變數一共加到這個數

        奇怪了,我不是每個執行緒都自加一百萬次嗎?照理來說,應該最後的結果是200萬才對的呀。問題出在哪裡呢?

        我們知道CPU是採用時間片輪詢的方式進行幾個執行緒的執行。

        假設我CPU先輪詢到work1(),num此時為100,在我執行到第10行時,時間結束了!此時,賦值了,但是還沒有自加!即temp=100num=100

        然後,時間片輪詢到了work2(),進行賦值自加。num=101了。

        又回到work1()的斷點處,num=temp+1,temp=100,所以num=101。

        就這樣!num少了一次自加!在次數多了之後,這樣的錯誤積累在一起,結果只得到158806!

        這就是執行緒安全問題

1.7 互斥鎖可以彌補部分執行緒安全問題。(互斥鎖和GIL鎖是不一樣的東西!)

        當多個執行緒幾乎同時修改某一個共用資料的時候,需要進行同步控制

        執行緒同步能夠保證多個執行緒安全存取競爭資源,最簡單的同步機制是引入互斥鎖。

        互斥鎖為資源引入一個狀態:鎖定/非鎖定

        某個執行緒要更改共用資料時,先將其鎖定,此時資源的狀態為「鎖定」,其他執行緒不能更改;直到該執行緒釋放資源,將資源的狀態變成「非鎖定」,其他的執行緒才能再次鎖定該資源。互斥鎖保證了每次只有一個執行緒進行寫入操作,從而保證了多執行緒情況下資料的正確性。

        互斥鎖有三個常用步驟:

lock = threading.Lock()  # 取得鎖
lock.acquire()  # 上鎖
lock.release()  # 解鎖

        下面讓我們用互斥鎖來解決上面例子的執行緒安全問題。

import threading
import time
 
num = 0
lock = threading.Lock()  # 取得鎖
def work1(loop):
    global num
    for i in range(loop):
        # 等價於 num += 1
        lock.acquire()  # 上鎖
        temp = num
        num = temp + 1
        lock.release()  # 解鎖
    print(num)
 
 
def work2(loop):
    global num
    for i in range(loop):
        # 等價於 num += 1
        lock.acquire()  # 上鎖
        temp = num
        num = temp + 1
        lock.release()  # 解鎖
    print(num)
 
 
if __name__ == '__main__':
    t1 = threading.Thread(target=work1,args=(1000000,))
    t2 = threading.Thread(target=work2, args=(1000000,))
    t1.start()
    t2.start()
 
    while len(threading.enumerate()) != 1:
        time.sleep(1)
    print(num)

輸出:

1945267 # 第一個子執行緒結束後全域性變數一共加到這個數
2000000 # 第二個子執行緒結束後全域性變數一共加到這個數
2000000 # 兩個執行緒都結束後,全域性變數一共加到這個數

1.8 執行緒池ThreadPoolExecutor

        從Python3.2開始,標準庫為我們提供了concurrent.futures模組,它提供了ThreadPoolExecutorProcessPoolExecutor兩個類,實現了對threadingmultiprocessing的進一步抽象(這裡主要關注執行緒池),不僅可以幫我們自動排程執行緒,還可以做到:

  • 主執行緒可以獲取某一個執行緒(或者任務的)的狀態,以及返回值。
  • 當一個執行緒完成的時候,主執行緒能夠立即知道。
  • 讓多執行緒和多程序的編碼介面一致。

1.8.1 建立執行緒池

範例:

from concurrent.futures import ThreadPoolExecutor
import time
 
# 引數times用來模擬網路請求的時間
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times
 
executor = ThreadPoolExecutor(max_workers=2)
# 通過submit函數提交執行的函數到執行緒池中,submit函數立即返回,不阻塞
task1 = executor.submit(get_html, (3))
task2 = executor.submit(get_html, (2))
# done方法用於判定某個任務是否完成
print("1: ", task1.done())
# cancel方法用於取消某個任務,該任務沒有放入執行緒池中才能取消成功
print("2: ", task2.cancel())
time.sleep(4)
print("3: ", task1.done())
# result方法可以獲取task的執行結果
print("4: ", task1.result())

輸出:

  • ThreadPoolExecutor構造範例的時候,傳入max_workers引數來設定執行緒池中最多能同時執行的執行緒數目。
  • 使用submit函數來提交執行緒需要執行的任務(函數名和引數)到執行緒池中,並返回該任務的控制程式碼(類似於檔案、畫圖),注意submit()不是阻塞的,而是立即返回。
  • 通過submit函數返回的任務控制程式碼,能夠使用done()方法判斷該任務是否結束。上面的例子可以看出,由於任務有2s的延時,在task1提交後立刻判斷,task1還未完成,而在延時4s之後判斷,task1就完成了。
  • 使用cancel()方法可以取消提交的任務,如果任務已經線上程池中執行了,就取消不了。這個例子中,執行緒池的大小設定為2,任務已經在執行了,所以取消失敗。如果改變執行緒池的大小為1,那麼先提交的是task1,task2還在排隊等候,這是時候就可以成功取消。
  • 使用result()方法可以獲取任務的返回值。檢視內部程式碼,發現這個方法是阻塞的。

1.8.2 as_completed

        上面雖然提供了判斷任務是否結束的方法,但是不能在主執行緒中一直判斷啊。有時候我們是得知某個任務結束了,就去獲取結果,而不是一直判斷每個任務有沒有結束。這是就可以使用as_completed方法一次取出所有任務的結果。

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
 
# 引數times用來模擬網路請求的時間
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times
 
executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 並不是真的url
all_task = [executor.submit(get_html, (url)) for url in urls]
 
for future in as_completed(all_task):
    data = future.result()
    print("in main: get page {}s success".format(data))
 
# 執行結果
# get page 2s finished
# in main: get page 2s success
# get page 3s finished
# in main: get page 3s success
# get page 4s finished
# in main: get page 4s success

   as_completed()方法是一個生成器,在沒有任務完成的時候,會阻塞,在有某個任務完成的時候,會yield這個任務,就能執行for迴圈下面的語句,然後繼續阻塞住,迴圈到所有的任務結束。從結果也可以看出,先完成的任務會先通知主執行緒

1.8.3 map

        除了上面的as_completed方法,還可以使用executor.map方法,但是有一點不同。

from concurrent.futures import ThreadPoolExecutor
import time
 
# 引數times用來模擬網路請求的時間
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times
 
executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 並不是真的url
 
for data in executor.map(get_html, urls):
    print("in main: get page {}s success".format(data))
# 執行結果
# get page 2s finished
# get page 3s finished
# in main: get page 3s success
# in main: get page 2s success
# get page 4s finished
# in main: get page 4s success

        使用map方法,無需提前使用submit方法map方法python標準庫中的map含義相同,都是將序列中的每個元素都執行同一個函數。上面的程式碼就是對urls的每個元素都執行get_html函數,並分配各執行緒池。可以看到執行結果與上面的as_completed方法的結果不同,輸出順序和urls列表的順序相同,就算2s的任務先執行完成,也會先列印出3s的任務先完成,再列印2s的任務完成。

1.8.4 wait

   wait方法可以讓主執行緒阻塞,直到滿足設定的要求。

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
import time
 
# 引數times用來模擬網路請求的時間
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times
 
executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 並不是真的url
all_task = [executor.submit(get_html, (url)) for url in urls]
wait(all_task, return_when=ALL_COMPLETED)
print("main")
# 執行結果 
# get page 2s finished
# get page 3s finished
# get page 4s finished
# main

   wait方法接收3個引數,等待的任務序列、超時時間以及等待條件。等待條件return_when預設為ALL_COMPLETED,表明要等待所有的任務都結束。可以看到執行結果中,確實是所有任務都完成了,主執行緒才列印出main。等待條件還可以設定為FIRST_COMPLETED,表示第一個任務完成就停止等待。

2 多程序實行多工

2.1 多執行緒的建立方式

建立程序的方式和建立執行緒的方式類似:

  • 範例化一個multiprocessing.Process的物件,並傳入一個初始化函數物件(initial function )作為新建程序執行入口;
  • 繼承multiprocessing.Process,並重寫run函數;

2.1.1 方式1

       在開始之前,我們要知道什麼是程序。道理很簡單,你平時電腦開啟QQ使用者端,就是一個程序。再開啟一個QQ使用者端,又是一個程序。那麼,在python中如何用一篇程式碼就可以開啟幾個程序呢?通過一個簡單的例子來演示:

import multiprocessing
import time
 
 
def task1():
    while True:
        time.sleep(1)
        print("I am task1")
 
def task2():
    while True:
        time.sleep(2)
        print("I am task2")
 
 
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task1)  # multiprocessing.Process建立了子程序物件p1
    p2 = multiprocessing.Process(target=task2)  # multiprocessing.Process建立了子程序物件p2
    p1.start()  # 子程序p1啟動
    p2.start()  # 子程序p2啟動
    print("I am main task")  # 這是主程序的任務

輸出:

        可以看到子程序物件是由multiprocessing模組中的Process類建立的。除了p1,p2兩個被建立的子程序外。當然還有主程序。主程序就是我們從頭到尾的程式碼,包括子程序也是由主程序建立的。

注意的點有:

(1)首先解釋一下並行:並行就是當任務數大於cpu核數時,通過作業系統的各種任務排程演演算法,實現多個任務「一起」執行。(實際上總有一些任務不在執行,因為切換任務相當快,看上去想同時執行而已。)

(2)當是並行的情況下,子程序主程序的執行都是沒有順序的,CPU會採用時間片輪詢的方式,哪個程式先要執行就先執行哪個。

(3)主程序會預設等待所有子程序執行完畢後,它才會退出。所以在上面的例子中,p1,p2子程序是死迴圈程序,主程序的最後一句程式碼print("I am main task")雖然執行完了,但是主程序並不會關閉,他會一直等待著子程序。

(4)主程序預設建立的是非守護行程。注意,結合3.和5.看。

(5)但是!如果子程序守護行程的話,那麼主程序執行完最後一句程式碼後,主程序會直接關閉,不管你子程序執行完了沒有!

2.1.2 方式2

from multiprocessing import Process  
import os, time
 
class CustomProcess(Process):
    def __init__(self, p_name, target=None):
        # step 1: call base __init__ function()
        super(CustomProcess, self).__init__(name=p_name, target=target, args=(p_name,))
 
    def run(self):
        # step 2:
        # time.sleep(0.1)
        print("Custom Process name: %s, pid: %s "%(self.name, os.getpid()))
 
if __name__ == '__main__':
    p1 = CustomProcess("process_1")
    p1.start()
    p1.join()
    print("subprocess pid: %s"%p1.pid)
    print("current process pid: %s" % os.getpid())

輸出:

        這裡可以思考一下,如果像多執行緒一樣,存在一個全域性的變數share_data,不同程序同時存取share_data會有問題嗎?

        由於每一個程序擁有獨立的記憶體地址空間且互相隔離,因此不同程序看到的share_data是不同的、分別位於不同的地址空間,同時存取不會有問題。這裡需要注意一下。

2.2 守護行程

 測試下:

import multiprocessing
import time
 
 
def task1():
    while True:
        time.sleep(1)
        print("I am task1")
 
def task2():
    while True:
        time.sleep(2)
        print("I am task2")
 
 
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task1)
    p2 = multiprocessing.Process(target=task2)
    p1.daemon = True  # 設定p1子程序為守護行程
    p2.daemon = True  # 設定p2子程序為守護行程
    p1.start()
    p2.start()
    print("I am main task")

輸出:

I am main task

輸出結果是不是有點奇怪。為什麼p1,p2子程序都沒有輸出的?

讓我們來整理一下思路:

  • 建立p1,p2子程序
  • 設定p1,p2子程序為守護行程
  • p1,p2子程序開啟
  • p1,p2子程序程式碼裡面都有休眠時間,所以cpu為了不浪費時間,先做主程序後續的程式碼。
  • 執行主程序後續的程式碼,print("I am main task")
  • 主程序後續的程式碼執行完成了,所以剩下的子程序是守護行程的,全都要關閉了。但是,如果主程序的程式碼執行完了,有兩個子程序,一個是守護的,一個非守護的,怎麼辦呢?其實,他會等待非守護的那個子程序執行完,然後三個程序一起關閉。
  • p1,p2還在休眠時間內就被終結生命了,所以什麼輸出都沒有。

例如,把P1設為非守護行程:

import multiprocessing
import time
 
 
def task1():
    i = 1
    while i < 5:
        time.sleep(1)
        i += 1
        print("I am task1")
 
def task2():
    while True:
        time.sleep(2)
        print("I am task2")
 
 
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task1)
    p2 = multiprocessing.Process(target=task2)
    p2.daemon = True  # 設定p2子程序為守護行程
    p1.start()
    p2.start()
    print("I am main task")

輸出:

裡面涉及到兩個知識點:

(1)當主程序結束後,會發一個訊息給子程序(守護行程),守護行程收到訊息,則立即結束

(2)CPU是按照時間片輪詢的方式來執行多程序的。哪個合適的哪個執行,如果你的子程序裡都有time.sleep。那我CPU為了不浪費資源,肯定先去幹點其他的事情啊。

        那麼,守護行程隨時會被中斷,他的存在意義在哪裡的?

        其實,守護行程主要用來做與業務無關的任務,無關緊要的任務,可有可無的任務,比如記憶體垃圾回收,某些方法的執行時間的計時等。

2.3 建立的子程序要傳入引數

import multiprocessing
 
 
def task(a,b,*args,**kwargs):
    print("a")
    print("b")
    print(args)
    print(kwargs)
 
 
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task,args=(1,2,3,4,5,6),kwargs={"name":"chichung","age":23})
    p1.start()
    print("主程序已經執行完最後一行程式碼啦")

輸出:

        子程序要執行的函數需要傳入變數a,b,一個元組,一個字典。我們建立子程序的時候,變數a,b要放進元組裡面,task函數取的時候會把前兩個取出來,分別賦值給a,b了。

2.4 子程序幾個常用的方法

 

p.start 開始執行子執行緒
p.name 檢視子程序的名稱
p.pid 檢視子程序的id
p.is_alive 判斷子程序是否存活
p.join(timeout)

阻塞主程序,當子程序p執行完畢後,再解開阻塞,讓主程序執行後續的程式碼

如果timeout=2,就是阻塞主程序2s,這2s內主程序不能執行後續的程式碼。過了2s後,就運算元程序沒有執行完畢,主程序也能執行後續的程式碼

p.terminate 終止子程序p的執行

import multiprocessing
 
def task(a,b,*args,**kwargs):
    print("a")
    print("b")
    print(args)
    print(kwargs)
 
 
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task,args=(1,2,3,4,5,6),kwargs={"name":"chichung","age":23})
    p1.start()
    print("p1子程序的名字:%s" % p1.name)
    print("p1子程序的id:%d" % p1.pid)
    p1.join()
    print(p1.is_alive())

輸出:

2.5 程序之間是不可以共用全域性變數

        程序之間是不可以共用全域性變數的,即使子程序與主程序。道理很簡單,一個新的程序,其實就是佔用一個新的記憶體空間,不同的記憶體空間,裡面的變數肯定不能夠共用的。實驗證明如下:

範例一:

import multiprocessing
 
g_list = [123]
 
def task1():
    g_list.append("task1")
    print(g_list)
 
def task2():
    g_list.append("task2")
    print(g_list)
 
def main_process():
    g_list.append("main_processs")
    print(g_list)
 
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task1)
    p2 = multiprocessing.Process(target=task2)
    p1.start()
    p2.start()
    main_process()
    print("11111: ", g_list)

輸出:

[123, 'main_processs']
11111: [123, 'main_processs']
[123, 'task1']
[123, 'task2']

 範例二:

import multiprocessing
import time
 
 
def task1(loop):
    global num
    for i in range(loop):
        # 等價於 num += 1
        temp = num
        num = temp + 1
    print(num)
    print("I am task1")
 
def task2(loop):
    global num
    for i in range(loop):
        # 等價於 num += 1
        temp = num
        num = temp + 1
    print(num)
    print("I am task2")
 
 
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task1, args=(100000,)  # multiprocessing.Process建立了子程序物件p1
    p2 = multiprocessing.Process(target=task2, args=(100000,)  # multiprocessing.Process建立了子程序物件p2
    p1.start()  # 子程序p1啟動
    p2.start()  # 子程序p2啟動
    print("I am main task")  # 這是主程序的任務

輸出:

2.6 python程序池:multiprocessing.pool

        程序池可以理解成一個佇列,該佇列可以容易指定數量的子程序,當佇列被任務佔滿之後,後續新增的任務就得排隊,直到舊的程序有任務執行完空餘出來,才會去執行新的任務。

        在利用Python進行系統管理的時候,特別是同時操作多個檔案目錄,或者遠端控制多臺主機,並行操作可以節約大量的時間。當被操作物件數目不大時,可以直接利用multiprocessing中的Process動態成生多個程序,十幾個還好,但如果是上百個,上千個目標,手動的去限制程序數量卻又太過繁瑣,此時可以發揮程序池的功效。

        Pool可以提供指定數量的程序供使用者呼叫,當有新的請求提交到pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到規定最大值,那麼該請求就會等待,直到池中有程序結束,才會建立新的程序來它。

2.6.1 使用程序池(非阻塞)

#coding: utf-8
import multiprocessing
import time
 
def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")
 
if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3) # 設定程序的數量為3
    for i in range(4):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))   #維持執行的程序總數為processes,當一個程序執行完畢後會新增新的程序進去
 
    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()   #呼叫join之前,先呼叫close函數,否則會出錯。執行完close後不會有新的程序加入到pool,join函數等待所有子程序結束
    print("Sub-process(es) done.")

輸出:

函數解釋

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解區別,看例1例2結果區別)
  • close() 關閉pool,使其不在接受新的任務。
  • terminate() 結束工作程序,不在處理未完成的任務。
  • join() 主程序阻塞,等待子程序的退出, join方法要在close或terminate之後使用。

apply(), apply_async():

  • apply(): 阻塞主程序, 並且一個一個按順序地執行子程序, 等到全部子程序都執行完畢後 ,繼續執行 apply()後面主程序的程式碼
  • apply_async() 非阻塞非同步的, 他不會等待子程序執行完畢, 主程序會繼續執行, 他會根據系統排程來進行程序切換

執行說明:建立一個程序池pool,並設定程序的數量為3,xrange(4)會相繼產生四個物件[0, 1, 2, 4],四個物件被提交到pool中,因pool指定程序數為3,所以0、1、2會直接送到程序中執行,當其中一個執行完事後才空出一個程序處理物件3,所以會出現輸出「msg: hello 3」出現在"end"後。因為為非阻塞,主函數會自己執行自個的,不搭理程序的執行,所以執行完for迴圈後直接輸出「mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~」,主程式在pool.join()處等待各個程序的結束。

2.6.2 使用程序池(阻塞)

#coding: utf-8
import multiprocessing
import time
 
def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")
 
if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3) # 設定程序的數量為3
    for i in range(4):
        msg = "hello %d" %(i)
        pool.apply(func, (msg, ))   #維持執行的程序總數為processes,當一個程序執行完畢後會新增新的程序進去
 
    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()   #呼叫join之前,先呼叫close函數,否則會出錯。執行完close後不會有新的程序加入到pool,join函數等待所有子程序結束
    print("Sub-process(es) done.")

輸出:

2.6.3 使用程序池,並關注結果

import multiprocessing
import time
 
def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")
    return "done" + msg
 
if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in range(3):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print(":::", res.get())
    print("Sub-process(es) done.")

輸出:

  :get()函數得出每個返回結果的值

3 python多執行緒與多程序比較

先來看兩個例子:

(1)範例一,多執行緒與單執行緒,開啟兩個python執行緒分別做一億次加一操作,和單獨使用一個執行緒做一億次加一操作:

import threading
import time
 
def tstart(arg):
    var = 0
    for i in range(100000000):
        var += 1
    print(arg, var)
 
if __name__ == '__main__':
    t1 = threading.Thread(target=tstart, args=('This is thread 1',))
    t2 = threading.Thread(target=tstart, args=('This is thread 2',))
    start_time = time.time()
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("Two thread cost time: %s" % (time.time() - start_time))
    start_time = time.time()
    tstart("This is thread 0")
    print("Main thread cost time: %s" % (time.time() - start_time))

輸出:

 上面的例子如果只開啟t1和t2兩個執行緒中的一個,那麼執行時間和主執行緒基本一致。

 (2)範例二,使用兩個程序

from multiprocessing import Process  
import os, time
 
def pstart(arg):
    var = 0
    for i in range(100000000):
        var += 1
    print(arg, var)
 
if __name__ == '__main__':
    p1 = Process(target = pstart, args = ("1", ))
    p2 = Process(target = pstart, args = ("2", ))
    start_time = time.time()
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("Two process cost time: %s" % (time.time() - start_time))
    start_time = time.time()
    pstart("0")
    print("Current process cost time: %s" % (time.time() - start_time))

輸出:

 對比分析:

        雙程序並行執行單程序執行相同的運算程式碼,耗時基本相同,雙程序耗時會稍微多一些,可能的原因是程序建立和銷燬會進行系統呼叫,造成額外的時間開銷。

        但是對於python執行緒,雙執行緒並行執行耗時比單執行緒要高的多,效率相差近10倍。如果將兩個並行執行緒改成序列執行,即:

import threading
import time
 
def tstart(arg):
    var = 0
    for i in range(100000000):
        var += 1
    print(arg, var)
 
if __name__ == '__main__':
    t1 = threading.Thread(target=tstart, args=('This is thread 1',))
    t2 = threading.Thread(target=tstart, args=('This is thread 2',))
    start_time = time.time()
    t1.start()
    t1.join()
    print("thread1 cost time: %s" % (time.time() - start_time))
    start_time = time.time()
    t2.start()
    t2.join()
    print("thread2 cost time: %s" % (time.time() - start_time))
    start_time = time.time()
    tstart("This is thread 0")
    print("Main thread cost time: %s" % (time.time() - start_time))

輸出:

可以看到三個執行緒序列執行,每一個執行的時間基本相同。

本質原因雙執行緒是並行執行的,而不是真正的並行執行。原因就在於GIL鎖

4 GIL鎖

        提起python多執行緒就不得不提一下GIL(Global Interpreter Lock 全域性直譯器鎖),這是目前佔統治地位的python直譯器CPython中為了保證資料安全所實現的一種鎖。不管程序中有多少執行緒,只有拿到了GIL鎖的執行緒才可以在CPU上執行,即使是多核處理器對一個程序而言,不管有多少執行緒,任一時刻,只會有一個執行緒在執行。對於CPU密集型的執行緒,其效率不僅僅不高,反而有可能比較低。python多執行緒比較適用於IO密集型的程式。對於的確需要並行執行的程式,可以考慮多程序。

        多執行緒對鎖的爭奪,CPU對執行緒的排程,執行緒之間的切換等均會有時間開銷。

5 執行緒和程序比較

5.1 執行緒和程序的區別

下面簡單的比較一下執行緒與程序

  • 程序是資源分配的基本單位,執行緒是CPU執行和排程的基本單位;
  • 通訊/同步方式:
    • 程序:
      • 通訊方式:管道,FIFO,訊息佇列,訊號,共用記憶體,socket,stream流;
      • 同步方式:PV號誌,管程
    • 執行緒:
      • 同步方式:互斥鎖,遞迴鎖,條件變數,號誌
      • 通訊方式:位於同一程序的執行緒共用程序資源,因此執行緒間沒有類似於程序間用於資料傳遞的通訊方式,執行緒間的通訊主要是用於執行緒同步。
  • CPU上真正執行的是執行緒,執行緒比程序輕量,其切換和排程代價比程序要小;
  • 執行緒間對於共用的程序資料需要考慮執行緒安全問題,由於程序之間是隔離的,擁有獨立的記憶體空間資源,相對比較安全,只能通過上面列出的IPC(Inter-Process Communication)進行資料傳輸;
  • 系統有一個個行程群組成,每個程序包含程式碼段、資料段、堆空間和棧空間,以及作業系統共用部分 ,有等待,就緒和執行三種狀態;
  • 一個程序可以包含多個執行緒,執行緒之間共用程序的資源(檔案描述符、全域性變數、堆空間等),暫存器變數和棧空間等是執行緒私有的;
  • 作業系統中一個程序掛掉不會影響其他程序,如果一個程序中的某個執行緒掛掉而且OS對執行緒的支援是多對一模型,那麼會導致當前程序掛掉;
  • 如果CPU和系統支援多執行緒與多程序,多個程序並行執行的同時,每個程序中的執行緒也可以並行執行,這樣才能最大限度的榨取硬體的效能;

5.2 執行緒和程序的上下文切換

程序切換過程切換牽涉到非常多的東西,暫存器內容儲存到任務狀態段TSS,切換頁表,堆疊等。簡單來說可以分為下面兩步:

  • 頁全域性目錄切換,使CPU到新程序的線性地址空間定址;
  • 切換核心態堆疊和硬體上下文,硬體上下文包含CPU暫存器的內容,存放在TSS中;

執行緒執行於程序地址空間,切換過程不涉及到空間的變換,只牽涉到第二步;

5.3 使用多執行緒還是多程序?

  • CPU密集型:程式需要佔用CPU進行大量的運算和資料處理;適合多程序;
  • I/O密集型:程式中需要頻繁的進行I/O操作;例如網路中socket資料傳輸和讀取等;適合多執行緒

        由於python多執行緒並不是並行執行,因此較適合與I/O密集型程式,多程序並行執行適用於CPU密集型程式;

python多執行緒實現多工:https://www.cnblogs.com/chichung/p/9566734.html

python通過多程序實行多工:https://www.cnblogs.com/chichung/p/9532962.html

python多執行緒與多程序及其區別:https://www.cnblogs.com/yssjun/p/11302500.html

python程序池:multiprocessing.pool:https://www.cnblogs.com/kaituorensheng/p/4465768.html

到此這篇關於python 多執行緒實現多工的方法範例的文章就介紹到這了,更多相關python 多執行緒實現多工的方法範例內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com