首頁 > 軟體

Python多執行緒與同步機制淺析

2022-12-23 14:00:14

執行緒實現

Python中執行緒有兩種方式:函數或者用類來包裝執行緒物件。threading模組中包含了豐富的多執行緒支援功能:

  • threading.currentThread(): 返回當前執行緒;
  • threading.enumerate(): 返回包含正在執行的執行緒列表;
  • threading.activeCount(): 返回正在執行的執行緒數量,與len(threading.enumerate())等價。

Thread類

通過Thread類來處理執行緒,類中提供的一些方法:

  • run(): 用以表示執行緒執行的方法(可過載實現實際功能);
  • start(): 啟動執行緒;
  • join([time]): 等待執行緒中止(或者超時);
  • isAlive(): 返回執行緒是否活動;
  • getName(): 返回執行緒名;
  • setName(): 設定執行緒名;
  • setDaemon(True):設定為後臺程序(必須在start呼叫前設定)。

函數方式

通過Thread直接構造執行緒,然後通過start方法啟動執行緒:

threading.Thread(group=None, target=None, name=None, args=(), kwargs=None, *,daemon=None)

各引數說明:

  • group:指定執行緒隸屬的執行緒組(當前忽略);
  • target:指定執行緒要排程的目標方法(即實現功能的函數);
  • args:傳遞給目標方法的引數(以元組的方式);
  • kwargs:傳遞給目標方法的引數(以字典的方式);
  • daemon:指定執行緒是否為後臺執行緒。
def simpleRoutine(name, delay):
    print(f"routine {name} starting...")
    time.sleep(delay)
    print(f"routine {name} finished")
if __name__ == '__main__':
    thrOne = threading.Thread(target=simpleRoutine, args=("First", 1))
    thrTwo = threading.Thread(target=simpleRoutine, args=("Two", 2))
    thrOne.start()
    thrTwo.start()
    thrOne.join()
    thrTwo.join()

繼承方式

直接繼承Thread,建立一個新的子類(主要實現run方法):

class SimpleThread (threading.Thread):
    def __init__(self, name, delay):
        # threading.Thread.__init__(self)
        super().__init__()
        self.name = name
        self.delay = delay
    def run(self):
        print(f"thread {self.name} starting...")
        time.sleep(self.delay)
        print(f"thread {self.name} finished")
if __name__ == '__main__':
    thrOne = SimpleThread("First", 2)
    thrTwo = SimpleThread("Second", 2)
    thrOne.start()
    thrTwo.start()
    thrOne.join()
    thrTwo.join()

同步機制

當多個執行緒同時修改同一條資料時可能會出現髒資料;所以,就需要執行緒鎖,即同一時刻只允許一個執行緒執行操作。

同步鎖Lock

threading提供了Lock和RLock(可重入鎖)兩個類,它們都提供瞭如下兩個方法來加鎖和釋放鎖:

  • acquire(blocking=True, timeout=-1):加鎖,其中 timeout 引數指定加鎖多少秒。
  • release():釋放鎖。

兩種使用鎖的方式:

gCount = 0
def PlusOne(locker):
    global gCount
      with locker:
          gCount += 1、
def MinusOne(locker):
    global gCount
      if locker.acquire():
          gCount -= 1
          locker.release()

條件變數Condition

Condition物件內部維護了一個鎖(構造時可傳遞一個Lock/RLock物件,否則內部會自行建立一個RLock)和一個waiting池:

  • 通過acquire獲得Condition物件;
  • 當呼叫wait方法時,執行緒會釋放Condition內部的鎖並進入blocked狀態,同時在waiting池中記錄這個執行緒;
  • 當呼叫notify方法時,Condition物件會從waiting池中挑選一個執行緒,通知其呼叫acquire方法嘗試取到鎖。

Condition物件:

__init__(self,lock=None):Condition類總是與一個鎖相關聯(若不指定lock引數,會自動建立一個與之繫結的RLock物件);

acquire(timeout):呼叫關聯鎖的acquire()方法;

release():呼叫關聯鎖的release()方法

wait(timeout):執行緒掛起,直到收到一個notify通知或超時才會被喚醒;必須在已獲得鎖的前提下呼叫;

notify(n=1):喚醒waiting池中的n個正在等待的執行緒並通知它:

  • 收到通知的執行緒將自動呼叫acquire()方法嘗試加鎖;
  • 若waiting池中有多個執行緒,隨機選擇n個喚醒;
  • 必須在已獲得鎖的前提下呼叫,否則將引發錯誤。

notify_all():通知所有執行緒。

class Producer(threading.Thread):
    def __init__(self, cond, storage):
        threading.Thread.__init__(self)
        self.cond = cond
        self.storage = storage
    def run(self):
        label = 1
        while True:
            with self.cond:
                if len(self.storage) < 10:
                    self.storage.append(label)
                    print(f"<- Produce {label} product")
                    label += 1
                    self.cond.notify(2)
                else:
                    print(f"<- storage full: Has Produced {label - 1} product")
                    self.cond.notify_all()
                    self.cond.wait()
                time.sleep(0.4)
class Consumer(threading.Thread):
    def __init__(self, name, cond, storage):
        threading.Thread.__init__(self)
        self.name = name
        self.cond = cond
        self.storage = storage
    def run(self):
        while True:
            if self.cond.acquire():
                if len(self.storage) > 1:
                    pro = self.storage.pop(0)
                    print(f"-> {self.name} consumed {pro}")
                    self.cond.notify()
                else:
                    print(f"-> {self.name} storage empty: no product to consume")
                    self.cond.wait()
                self.cond.release()
                time.sleep(1)

號誌Semaphore

號誌物件內部維護一個計數器:

  • acquire(blocking=True,timeout=None)時減1,當計數為0就阻塞請求的執行緒;
  • release()時加1,當計數大於0恢復被阻塞的執行緒;

threading中有Semaphore和BoundedSemaphore兩個號誌;BoundedSemaphore限制了release的次數,任何時候計數器的值,都不不能大於初始值(release時會檢測計數器的值,若大於等於初始值,則丟擲ValueError異常)。

通過Semaphore維護生產(release一個)、消費(acquire一個)量:

# products = threading.Semaphore(0)
def produceOne(label, sem: threading.Semaphore):
    sem.release()
    print(f"{label} produce one")
def consumeOne(label, sem: threading.Semaphore):
    sem.acquire()
    print(f"{label} consume one")

通過BoundedSemaphore來控制並行數量(最多有Semaphore初始值數量的執行緒並行):

# runner = threading.BoundedSemaphore(3)
def runBound(name, sem: threading.BoundedSemaphore):
    with sem:
        print(f"{name} is running")
        time.sleep(1)
        print(f"{name} finished")

事件Event

事件物件內部有個標誌欄位,用於執行緒等待事件的發生:

  • isSet():返回event的狀態值;
  • wait():狀態為False時,一直阻塞;否則立即返回;
  • set(): 設定狀態值為True,啟用所有被阻塞的執行緒;
  • clear():恢復狀態值為False。

多執行緒等待事件發生,然後開始執行:

def waiters(name, evt: threading.Event):
    evt.wait()
    print(f"{name} is running")
    time.sleep(1)
    print(f"{name} finished")
def starting(evt: threading.Event):
    evt.set()
    print("event is set")

屏障Barrier

屏障用於設定等待執行緒數量,當數量達到指定值時,開始執行:

threading.Barrier(parties, action=None, timeout=None)

屏障屬性與方法:

  • wait(timeout=None):等待通過屏障;執行緒被阻塞,直到阻塞的數量達到parties時,被阻塞的執行緒被同時全部釋放;
  • reset():重置屏障到預設的空狀態;
  • abort():將障礙置為斷開狀態;導致等待的執行緒引發BrokenBarrierError異常;
  • partier():通過障礙所需的執行緒數;
  • n_waiting():當前在屏障中等待的執行緒數;
  • broken():如果屏障處於斷開狀態,則返回True。
def waitBarrier(name, barr: threading.Barrier):
    print(f"{name} waiting for open")
    try:
        barr.wait()
        print(f"{name} running")
        time.sleep(5)
    except threading.BrokenBarrierError:
        print(f"{name} exception")
    print(f"{name} finished")

GIL全域性直譯器鎖

GIL(Global Interpreter Lock,全域性直譯器鎖);cpython中,某個執行緒想要執行,必須先拿到GIL(可以把GIL看作是“通行證”)。每次釋放GIL鎖,執行緒都要進行鎖競爭,切換執行緒,會消耗資源。

由於GIL鎖的存在,python裡一個程序永遠只能同時執行一個執行緒(拿到GIL的執行緒),這就是為什麼在多核CPU上,python的多執行緒效率並不高:

  • CPU密集型程式碼:由於計算工作多,會很快用完時間片,然後觸發GIL的釋放與再競爭;
  • IO密集型程式碼(檔案處理、網路爬蟲等):多執行緒能夠有效提升效率(單執行緒下有IO操作會進行IO等待,造成不必要的時間浪費,而開啟多執行緒能線上程A等待時,自動切換到執行緒B,可以不浪費CPU的資源,從而能提升程式執行效率)。

python在使用多執行緒的時候,呼叫的是c語言的原生執行緒:

  • 拿到公共資料
  • 申請GIL
  • python直譯器呼叫os原生執行緒
  • os操作cpu執行運算
  • 當執行緒執行時間到後,就進行切換(context switch)

到此這篇關於Python多執行緒與同步機制淺析的文章就介紹到這了,更多相關Python多執行緒內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com