首頁 > 軟體

詳解Python中的多執行緒

2022-06-20 14:02:36

什麼是多執行緒:

  程序:正在執行的程式,QQ 360 ......

執行緒:就是程序中一條執行程式的執行路徑,一個程式至少有一條執行路徑。(360中的防毒 電腦體檢 電腦清理 同時執行的話就需要開啟多條路徑)

  每個執行緒都有自己需要執行的內容,而這些內容可以稱為執行緒要執行的任務。

  開啟多執行緒是為了同時執行多部分程式碼。

  好處:解決了多部分需要同時執行的問題

  弊端:如果執行緒過多,會導致效率很低(因為程式的執行都是CPU做著隨機 快速切換來完成的)

  • 執行緒與程序的區別
  • 執行緒共用記憶體,程序獨立記憶體
  • 執行緒啟動速度塊,程序啟動速度慢,執行時速度沒有可比性
  • 同一個程序的執行緒間可以直接交流,兩個程序想通訊,必須通過一箇中間代理來實現
  • 建立新執行緒很簡單,建立新程序需要對其父程序進行一次克隆
  • 一個執行緒可以控制和操作同一執行緒裡的其他執行緒,但是程序只能操作子程序

threading模組

多執行緒的使用方式一:直接使用

# -*- coding:utf-8 -*-
# 執行緒使用的方式一
import threading
import time
# 需要多執行緒執行的函數
def fun(args):
    print("我是執行緒%s" % args)
    time.sleep(2)
    print("執行緒%s執行結束" % args)
# 建立執行緒
t1 = threading.Thread(target=fun, args=(1,))
t2 = threading.Thread(target=fun, args=(2,))
start_time = time.time()
t1.start()
t2.start()
end_time = time.time()
print("兩個執行緒一共的執行時間為:", end_time-start_time)
print("主執行緒結束")
"""
執行結果:
我是執行緒1
我是執行緒2兩個執行緒一共的執行時間為: 0.0010077953338623047
主執行緒結束
執行緒1執行結束
執行緒2執行結束
"""

執行緒的第二種使用方式:繼承式呼叫

# 繼承式呼叫
import threading
import time
class MyThreading(threading.Thread):
    def __init__(self, name):
        super(MyThreading, self).__init__()
        self.name = name
    # 執行緒要執行的程式碼
    def run(self):
        print("我是執行緒%s" % self.name)
        time.sleep(2)
        print("執行緒%s執行結束" % self.name)
t1 = MyThreading(1)
t2 = MyThreading(2)
start_time = time.time()
t1.start()
t2.start()
end_time = time.time()
print("兩個執行緒一共的執行時間為:", end_time-start_time)
print("主執行緒結束")
"""
執行結果:
我是執行緒1
我是執行緒2
兩個執行緒一共的執行時間為: 0.0010724067687988281
主執行緒結束
執行緒2執行結束
執行緒1執行結束
"""

守護執行緒與join方法

  • 在Python多執行緒程式設計中,join方法的作用式執行緒同步。
  • 守護執行緒,是為守護別人而存在的,當設定為守護執行緒後,被守護的主執行緒不存在後,守護執行緒也自然不存在。
  • 第一種:python多執行緒預設情況
  • Python多執行緒預設情況(設定執行緒setDaemon(False)),主執行緒執行完自己的任務後,就退出了,此時子執行緒會繼續執行自己的任務,直到子執行緒任務結束
  • 程式碼演示:threading中的兩個建立多線成的例子都是。
  • 第二種:開啟守護執行緒
  • 開啟執行緒的setDaemon(True)),設定子執行緒為守護執行緒,實現主程式結束,子程式立馬全部結束功能
  • 程式碼演示:
# 守護執行緒
import threading
import time
class MyThreading(threading.Thread):
    def __init__(self, name):
        super(MyThreading, self).__init__()
        self.name = name
    # 執行緒要執行的程式碼
    def run(self):
        print("我是執行緒%s" % self.name)
        time.sleep(2)
        print("執行緒%s執行結束" % self.name)
t1 = MyThreading(1)
t2 = MyThreading(2)
start_time = time.time()
t1.setDaemon(True)
t1.start()
t2.setDaemon(True)
t2.start()
end_time = time.time()
print("兩個執行緒一共的執行時間為:", end_time-start_time)
print("主執行緒結束")
  • 注意:如果要設定為守護執行緒,一定要在開啟執行緒之前,將該執行緒設定為守護執行緒
  • 結論:主執行緒結束後,無論子執行緒1,2是否執行完成,都結束執行緒,不在繼續向下執行
  • 第三種:加入join方法設定同步
  • 當不給程式設定守護行程時,主程式將一直等待子程式全部執行完成才結束
  • 程式碼演示:
# join:執行緒同步
import threading
import time
class MyThreading(threading.Thread):
    def __init__(self, name):
        super(MyThreading, self).__init__()
        self.name = name
    # 執行緒要執行的程式碼
    def run(self):
        print("我是執行緒%s" % self.name)
        time.sleep(3)
        print("執行緒%s執行結束" % self.name)
threading_list = []
start_time = time.time()
for x in range(50):
    t = MyThreading(x)
    t.start()
    threading_list.append(t)
for x in threading_list:
    x.join()    # 為執行緒開啟同步
end_time = time.time()
print("50個執行緒一共的執行時間為:", end_time-start_time)
print("主執行緒結束")

結論:主執行緒等待50個子執行緒全部執行完成才結束。

執行緒鎖(互斥鎖Mutex)

  • 一個程序下可以啟用多個執行緒,多個執行緒共用父程序的記憶體空間,也就意味著每個執行緒可以存取同一份資料,此時如果多個執行緒同時要修改一份資料,會出現什麼狀況?
  • 程式碼演示:
# -*- coding:utf8  -*-
import threading
import time
num = 100
threading_list = []
def fun():
    global num
    print("get num:", num)
    num += 1
    time.sleep(1)
for x in range(200):
    t = threading.Thread(target=fun)
    t.start()
    threading_list.append(t)
for x in threading_list:
    x.join()
print("nun:", num)
  • 結論:執行結果可能會出現num<300的情況
  • 正常來講,這個num結果應該是300, 但在python 2.7上多執行幾次,會發現,最後列印出來的num結果不總是300,為什麼每次執行的結果不一樣呢? 哈,很簡單,假設你有A,B兩個執行緒,此時都 要對num 進行加1操作, 由於2個執行緒是並行同時執行的,所以2個執行緒很有可能同時拿走了num=100這個初始變數交給cpu去運算,當A執行緒去處完的結果是101,但此時B執行緒運算完的結果也是101,兩個執行緒同時CPU運算的結果再賦值給num變數後,結果就都是101。那怎麼辦呢? 很簡單,每個執行緒在要修改公共資料時,為了避免自己在還沒改完的時候別人也來修改此資料,可以給這個資料加一把鎖, 這樣其它執行緒想修改此資料時就必須等待你修改完畢並把鎖釋放掉後才能再存取此資料。 

*注:不要在3.x上執行,不知為什麼,3.x上的結果總是正確的,可能是自動加了鎖

加鎖版本:

import random
import threading
import time
num = 100
threading_list = []
def fun():
    global num
    time.sleep(random.random())
    lock.acquire() # 加鎖
    print("get num:", num, threading.current_thread())
    num += 1
    lock.release()  # 釋放鎖
# 範例化鎖物件
lock = threading.Lock()
for x in range(200):
    t = threading.Thread(target=fun)
    t.start()
    threading_list.append(t)
for x in threading_list:
    x.join()
print("num:", num)

GIL VS Lock

機智的同學可能會問到這個問題,就是既然你之前說過了,Python已經有一個GIL來保證同一時間只能有一個執行緒來執行了,為什麼這裡還需要lock? 注意啦,這裡的lock是使用者級的lock,跟那個GIL沒關係 ,具體我們通過下圖來看一下+配合我現場講給大家,就明白了。

那你又問了, 既然使用者程式已經自己有鎖了,那為什麼C python還需要GIL呢?加入GIL主要的原因是為了降低程式的開發的複雜度,比如現在的你寫python不需要關心記憶體回收的問題,因為Python直譯器幫你自動定期進行記憶體回收,你可以理解為python直譯器裡有一個獨立的執行緒,每過一段時間它起wake up做一次全域性輪詢看看哪些記憶體資料是可以被清空的,此時你自己的程式 裡的執行緒和 py直譯器自己的執行緒是並行執行的,假設你的執行緒刪除了一個變數,py直譯器的垃圾回收執行緒在清空這個變數的過程中的clearing時刻,可能一個其它執行緒正好又重新給這個還沒來及得清空的記憶體空間賦值了,結果就有可能新賦值的資料被刪除了,為了解決類似的問題,python直譯器簡單粗暴的加了鎖,即當一個執行緒執行時,其它人都不能動,這樣就解決了上述的問題, 這可以說是Python早期版本的遺留問題。

RLock(遞迴鎖)

說白了就是在一個大鎖中還要再包含子鎖

import threading, time
def run1():
    lock.acquire()
    print("grab the first part data")
    global num
    num += 1
    lock.release()
    return num
def run2():
    lock.acquire()
    print("grab the second part data")
    global num2
    num2 += 1
    lock.release()
    return num2
def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res, res2)
if __name__ == '__main__':
    num, num2 = 0, 0
    lock = threading.RLock()
    for i in range(3):
        t = threading.Thread(target=run3)
        t.start()
while threading.active_count() != 1:
    print(threading.active_count())
else:
    print('----all threads done---')
    print(num, num2)

在開發的過程中要注意有些操作預設都是 執行緒安全的(內部整合了鎖的機制),我們在使用的時無需再通過鎖再處理,例如:

import threading
data_list = []
lock_object = threading.RLock()
def task():
    print("開始")
    for i in range(1000000):
        data_list.append(i)
    print(len(data_list))
for i in range(2):
    t = threading.Thread(target=task)
    t.start()

Semaphore(號誌)

  • 互斥鎖同時只允許一個執行緒修改資料,而Semaphore是同時允許一定數量的執行緒修改資料,比如廁所有三個坑,那最多隻允許三個人上廁所,後面的人只能等前面的人出來才能進去。
  • 程式碼演示:
# -*- coding:GBK -*-
import threading
import time

sum_1 = 0
def run(i):
    global sum_1
    time.sleep(1)
    # lock.acquire()
    semaphore.acquire()
    sum_1 += 1
    print("執行緒%s來了,並修改了sum_1的值為:%s" % (i, sum_1))
    semaphore.release()
    # lock.release()

# lock = threading.Lock()
semaphore = threading.BoundedSemaphore(5)

for x in range(10):
    t = threading.Thread(target=run, args=(x,))
    t.start()

while threading.active_count() != 1:
    pass

print("程式結束")

Event(事件)

  • 通過Event來實現兩個或多個執行緒間的互動,下面是一個紅綠燈的例子,即起動一個執行緒做交通指揮燈,生成幾個執行緒做車輛,車輛行駛按紅燈停,綠燈行的規則。
  • 四個常用方法

set()  # 設定標誌位為 True
clear()   # 清空標誌位(將標誌位改為false)
is_set()  # 檢測標誌位,如果標誌位被設定,返回True,否則返回False
wait()   # 等待標誌位被設定位True程式才繼續往下執行

程式碼演示:

# -*- coding:utf-8 -*-
import threading
import time
def light():
    count = 1
    event.set()  # 設定標誌位 True
    while True:
        if count <= 10:
            print("現在是綠燈")
            time.sleep(1)
        elif count <= 15:
            print("現在是紅燈")
            event.clear()   # 清空標誌位(將標誌位改為false)
            time.sleep(1)
        else:
            count = 0
            event.set()
        count += 1
def car(name):
    while True:
        if event.is_set():
            print("----------%s在起飛-------------" % name)
            time.sleep(1)
        else:
            print("---------%s在等紅燈---------------" % name)
            event.wait()   # 等待標誌位被設定位True程式才繼續往下執行
event = threading.Event()
light_1 = threading.Thread(target=light)
light_1.start()
for x in range(5):
    car_1 = threading.Thread(target=car, args=("馬自達"+str(x),))
    car_1.start()

紅綠燈案例

Queue(佇列)

queue.Queue(maxsize=0)
#佇列:先進先出  maxsize:設定佇列的大小
queue.LifoQueue(maxsize=0)
##last in fisrt out  maxsize:設定佇列的大小
queue.PriorityQueue(maxsize=0)
#儲存資料時可設定優先順序的佇列,按優先順序順序(最低的先)  maxsize:設定佇列的大小  

exceptionqueue.Empty

Exception raised when non-blocking get()(or get_nowait()) is called on a Queue object which is empty.

當在一個空的佇列物件上呼叫非阻塞的get()(或get_nowait())時,會產生異常。

exceptionqueue.Full

Exception raised when non-blocking put() (or put_nowait() ) is called on a Queue object which is full.

當非阻塞的put()(或put_nowait())被呼叫到一個已滿的佇列物件上時引發的異常。

import queue
# 範例化佇列物件
q = queue.Queue(3)
print(q.qsize())    # 獲取佇列內資料的長度
print(q.empty())    # 如果佇列是空的,返回True,否則返回False(不可靠!)
print(q.full())     # 如果佇列已滿,返回True,否則返回False(不可靠!)。
"""
Queue.put(item, block=True, timeout=None)
可以簡寫:Queue.put(item, True, 5)
將專案放入佇列。
如果可選的args block為true(預設值),並且timeout為None(預設值),必要時進行阻塞,直到有空閒的槽。
如果timeout是一個正數,它最多阻斷timeout秒,如果在這段時間內沒有空閒槽,則引發Full異常。
否則(block為false),如果有空閒的槽立即可用,就在佇列上放一個專案,否則就引發Full異常(在這種情況下忽略超時)。
"""
q.put(1)            # 資料「1」進入佇列
q.put("nihao")      # 資料"nihao"進入佇列
q.put("456ni", block=True, timeout=5)
'''
將一個專案放入佇列中,不進行阻斷。
只有在有空閒位置的情況下才排隊。
否則會引發Full異常。
'''
# q.put_nowait(123)

'''
Queue.get(block=True, timeout=None)
可以簡寫:Queue.get(True, 3)
從佇列中刪除並返回一個專案。
如果可選的args'block'為True(預設),'timeout'為無(預設)。
    就會在必要時阻塞,直到有一個專案可用。
    如果'timeout'是非負數,它最多阻斷'timeout'秒,如果在這段時間內沒有專案可用,則引發Empty異常。
否則('block'為False),如果有一個專案立即可用,則返回一個專案。
    否則引發Empty異常('timeout'被忽略了在這種情況下)。
'''
print(q.get())
print(q.get())
print(q.get())
print(q.get(block=True, timeout=2))
'''
從佇列中移除並返回一個專案,而不阻塞。
只有當一個專案立即可用時,才會得到一個專案。
否則引發Empty異常。
'''
# print(q.get_nowait())

生產者消費者模型

在並行程式設計中使用生產者和消費者模式能夠解決絕大多數並行問題。該模式通過平衡生產執行緒和消費執行緒的工作能力來提高程式的整體處理資料的速度。

為什麼要使用生產者和消費者模式

線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

# 生產者/消費者
import threading
import queue
import time
# 生產者
def producer(name):
    count = 1
    while True:
        p.put("{}骨頭{}".format(name, count))
        print("骨頭{}被{}生產".format(count, name).center(60, "*"))
        count += 1
        time.sleep(0.1)
# 消費者
def consumer(name):
    while True:
        print("{}被{}吃掉了".format(p.get(), name))
# 範例化佇列物件
p = queue.Queue(10)
# 建立生產者執行緒
producer_threading1 = threading.Thread(target=producer, args=("飛某人",))
producer_threading2 = threading.Thread(target=producer, args=("Alex",))
# 建立消費者執行緒
consumer_threading1 = threading.Thread(target=consumer, args=("張三",))
consumer_threading2 = threading.Thread(target=consumer, args=("李四",))
producer_threading1.start()
producer_threading2.start()
consumer_threading1.start()
consumer_threading2.start()

執行緒池

Python3中官方才正式提供執行緒池。

執行緒不是開的越多越好,開的多了可能會導致系統的效能更低了,例如:如下的程式碼是不推薦在專案開發中編寫。

import threading
def task(video_url):
    pass
url_list = ["www.xxxx-{}.com".format(i) for i in range(30000)]
for url in url_list:
    t = threading.Thread(target=task, args=(url,))
    t.start()
# 這種每次都建立一個執行緒去操作,建立任務的太多,執行緒就會特別多,可能效率反倒降低了。

建議:使用執行緒池

import time
from concurrent.futures import ThreadPoolExecutor  # 並行期貨,執行緒池執行者
"""
pool = ThreadPoolExecutor(100)
pool.submit(函數名,引數1,引數2,引數...)
"""
def task(video_url, num):
    print("開始執行任務", video_url, num)     # 開始執行任務 www.xxxx-299.com 3
    time.sleep(1)
# 建立執行緒池,最多維護10個執行緒
threadpool = ThreadPoolExecutor(10)
# 生成300網址,並放入列表
url_list = ["www.xxxx-{}.com".format(i) for i in range(300)]
for url in url_list:
    """
    線上程池中提交一個任務,執行緒池如果有空閒執行緒,則分配一個執行緒去執行,執行完畢後在將執行緒交還給執行緒池,
    如果沒有空閒執行緒,則等待。注意在等待時,與主執行緒無關,主執行緒依然在繼續執行。
    """
    threadpool.submit(task, url, 3)
print("等待執行緒池中的任務執行完畢中······")
threadpool.shutdown(True)   # 等待執行緒池中的任務執行完畢後,在繼續執行
print("END")

任務執行完任務,再幹點其他事:

"""執行緒池的回撥"""
import time
import random
from concurrent.futures import ThreadPoolExecutor
def task(video_url):
    print("開始執行任務", video_url)
    time.sleep(1)
    return random.randint(0, 10)    # 將結果封裝成一個Futuer物件,返回給執行緒池
def done(response):     # response就是futuer物件,也就是task的返回值分裝的一個Futuer物件
    print("任務執行完後,回撥的函數", response.result())    # 即Futuer.result():取出task的返回值
# 建立執行緒池
threadpool = ThreadPoolExecutor(10)
url_list = ["www.xxxx-{}.com".format(i) for i in range(5)]
for url in url_list:
    futuer = threadpool.submit(task, url)    # futuer是由task返回的一個Future物件,裡面有記錄task的返回值
    futuer.add_done_callback(done)           # 回撥done函數,執行者依然是子執行緒
# 優點:可以做分工,例如:task專門下載,done專門將下載的資料寫入本地檔案。

到此這篇關於詳解Python中的多執行緒的文章就介紹到這了,更多相關Python多執行緒內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com