首頁 > 軟體

Python中的執行緒操作模組(oncurrent)

2022-05-30 18:01:37

程序是cpu資源分配的最小單元,一個程序中可以有多個執行緒。

執行緒是cpu計算的最小單元。

對於Python來說他的程序和執行緒和其他語言有差異,是有GIL鎖。

GIL鎖

GIL鎖保證一個程序中同一時刻只有一個執行緒被cpu排程。

GIL鎖,全域性直譯器鎖。用於限制一個程序中同一時刻只有一個執行緒被cpu排程。
擴充套件:預設GIL鎖在執行100個cpu指令(過期時間)。
檢視GIL切換的指令個數

import sys
v1 = sys。getcheckinterval()
print(v1)

一、通過threading.Thread類建立執行緒

1、 建立執行緒的方式:直接使用Thread

from threading import Thread 
import time 

def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('nick',))
    t.start()
    print('主執行緒')

2、 建立執行緒的方式:繼承Thread

from threading import Thread
import time
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    t = Sayhi('nick')
    t.start()
    print('主執行緒')

二、多執行緒與多程序

1、 pid的比較

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    # part1:在主程序下開啟多個執行緒,每個執行緒都跟主程序的pid一樣
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('主執行緒/主程序pid',os.getpid())

    # part2:開多個程序,每個程序都有不同的pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('主執行緒/主程序pid',os.getpid())

2、 開啟效率的較量

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello')

if __name__ == '__main__':
    # 在主程序下開啟執行緒
    t=thread(target=work)
    t.start()
    print('主執行緒/主程序')
    '''
    列印結果:
    hello
    主執行緒/主程序
    '''

    # 在主程序下開啟子程序
    t=Process(target=work)
    t.start()
    print('主執行緒/主程序')
    '''
    列印結果:
    主執行緒/主程序
    hello
    '''

3、 記憶體資料的共用問題

from  threading import Thread
from multiprocessing import Process
import os
def work():
    global n
    n=0

if __name__ == '__main__':
    # n=100
    # p=Process(target=work)
    # p.start()
    # p.join()
    # print('主',n) # 毫無疑問子程序p已經將自己的全域性的n改成了0,但改的僅僅是它自己的,檢視父程序的n仍然為100


    n=1
    t=Thread(target=work)
    t.start()
    t.join()
    print('主',n) # 檢視結果為0,因為同一程序內的執行緒之間共用程序內的資料

三、Thread類的其他方法

Thread範例物件的方法:

  • isAlive():返回執行緒是否活動的。
  • getName():返回執行緒名。
  • setName():設定執行緒名。

threading模組提供的一些方法:

  • threading.currentThread():返回當前的執行緒變數。
  • threading.enumerate():返回一個包含正在執行的執行緒的list。正在執行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。
  • threading.activeCount():返回正在執行的執行緒數量,與len(threading.enumerate())有相同的結果。

1、 程式碼範例

from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    # 在主程序下開啟執行緒
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread())
 # 主執行緒
    print(threading.enumerate())
 # 連同主執行緒在內有兩個執行的執行緒
    print(threading.active_count())
    print('主執行緒/主程序')

    '''
    列印結果:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    主執行緒/主程序
    Thread-1
    '''

2、 join方法

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('nick',))
    t.start()
    t.join()
    print('主執行緒')
    print(t.is_alive())
'''
    nick say hello
    主執行緒
    False
    '''

四、多執行緒實現socket

import multiprocessing
import threading

import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)

def action(conn):
    while True:
        data=conn.recv(1024)
        print(data)
        conn.send(data.upper())

if __name__ == '__main__':

    while True:
        conn,addr=s.accept()

        p=threading.Thread(target=action,args=(conn,))
        p.start()

五、守護執行緒

無論是程序還是執行緒,都遵循:守護xx會等待主xx執行完畢後被銷燬。需要強調的是:執行完畢並非終止執行。

  • 對主程序來說,執行完畢指的是主程序程式碼執行完畢
  • 對主執行緒來說,執行完畢指的是主執行緒所在的程序內所有非守護執行緒統統執行完畢,主執行緒才算執行完畢

1、 詳細解釋

  • 主程序在其程式碼結束後就已經算執行完畢了(守護行程在此時就被回收),然後主程序會一直等非守護的子程序都執行完畢後回收子程序的資源(否則會產生殭屍程序),才會結束。

  • 主執行緒在其他非守護執行緒執行完畢後才算執行完畢(守護執行緒在此時就被回收)。因為主執行緒的結束意味著程序的結束,程序整體的資源都將被回收,而程序必須保證非守護執行緒都執行完畢後才能結束。

2、 守護執行緒例

from threading import Thread
import time


def foo():
    print(123)
    time.sleep(10)
    print("end123")


def bar():
    print(456)
    time.sleep(10)
    print("end456")


t1 = Thread(target=foo)
t2 = Thread(target=bar)

t1.daemon= True #必須在t.start()之前設定
# t1.setDaemon(True)

t1.start()
t2.start()
print("main-------")
print(t1.is_alive())
# 123
# 456
# main-------
# end456

六、同步鎖

1、 多個執行緒搶佔資源的情況

from threading import Thread
import os,time
def work():
    

global n temp=n 
time.sleep(0.1) 
n=temp-1 

if __name__ == '__main__':
    n=100


    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結果可能為99

2、同步鎖的參照

對公共資料的操作

import threading

R=threading.Lock()
R.acquire()
'''
對公共資料的操作
'''
R.release()

3、範例

不加鎖:並行執行,速度快,資料不安全

from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print('%s is running' %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''

加鎖:未加鎖部分並行執行,加鎖部分序列執行,速度慢,資料安全

from threading import current_thread,Thread,Lock
import os,time
def task():
    #未加鎖的程式碼並行執行
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    #加鎖的程式碼序列執行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
'''

七、死鎖與遞迴鎖

所謂死鎖:是指兩個或兩個以上的程序或執行緒在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的程序稱為死鎖程序,如下就是死鎖

1、 死鎖

from threading import Lock as Lock
import time

mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)

mutexA.release()
mutexA.release()

解決方法:遞迴鎖,在Python中為了支援在同一執行緒中多次請求同一資源,python提供了可重入鎖RLock。

2、 遞迴鎖(可重入鎖)RLock

這個RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖。

from threading import RLock as Lock
import time

mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()

3、典型問題:科學家吃麵

遞迴鎖解決死鎖問題

import time
from threading import Thread,RLock

fork_lock = noodle_lock = RLock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了麵條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃麵'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 搶到了麵條' % name)
    print('%s 吃麵' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['哪吒','nick','tank']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()

八、執行緒佇列

queue佇列:使用import queue,用法與程序Queue一樣

當必須在多個執行緒之間安全地交換資訊時,佇列線上程程式設計中特別有用。

1、先進先出:Queue

通過雙向列表實現的

class queue.Queue(maxsize=0)

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(先進先出):
first
second
third
'''

2、後進先出:LifoQueue

通過堆實現

class queue.LifoQueue(maxsize=0)

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(後進先出):
third
second
first
'''

3、儲存資料時可設定優先順序的佇列:PriorityQueue

PriorityQueue類和LifoQueue類繼承Queue類然後重寫了_init、_qsize、_put、_get這四個類的私有方法.

通過list來實現的。

class queue.PriorityQueue(maxsize=0)

優先佇列的建構函式。maxsize是一個整數,它設定可以放置在佇列中的項數的上限。一旦達到此大小,插入將阻塞,直到佇列項被使用。如果maxsize小於或等於0,則佇列大小為無窮大。

import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先順序(通常是數位,也可以是非數位之間的比較),數位越小優先順序越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
結果(數位越小優先順序越高,優先順序高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''

更多方法說明

  • __init__(self, maxsize=0) :初始化佇列長度,maxsize為0的時候長度為無限
  • empty(self) :返回佇列是否為空
  • full(self) :返回佇列是否為滿
  • qsize(self) :返回佇列的大小(並不可靠)
  • get(self, block=True, timeout=None) :從隊頭獲取並刪除元素,block為true:timeout為None時候,阻塞當前執行緒直到佇列中有可用元素;timeout為非負時候,等了timeout的時間還沒有可用元素時候丟擲一個Empty異常;block為false:timeout為None時候,佇列為空則丟擲Empty異常;timeout為非負時候,等待timeout時候後沒有可用元素則丟擲Empty異常。
  • get_nowait(self) :#返回self.get(block=False)
  • put(self, item, block=True, timeout=None): 在隊尾插入一個元素,block為true:timeout為None時候,阻塞當前執行緒直到佇列中有可用位置;timeout為非負時候,等了timeout時間還沒有可用位置時候丟擲一個Full異常;block為false:timeout為None時候,佇列沒有位置則丟擲Full異常;timeout為非負時候,等待timeout時候後還是沒有可用位置則丟擲Full異常。
  • put_nowait(self, item) :返回 self.put(item, block=False)
  • join(self) :阻塞當前執行緒直到佇列的任務全部完成了
  • task_done(self) :通知佇列任務的完成情況,當完成時候喚醒被join阻塞的執行緒

九、Python標準模組——concurrent.futures

官方檔案:https://docs.python.org/dev/library/concurrent.futures.html

1、介紹

concurrent.futures模組提供了高度封裝的非同步呼叫介面:

  • ThreadPoolExecutor:執行緒池,提供非同步呼叫
  • ProcessPoolExecutor:程序池,提供非同步呼叫

兩者都實現了由抽象Executor類定義的相同介面。

ThreadPoolExecutor(執行緒池)與ProcessPoolExecutor(程序池)都是concurrent.futures模組下的,主執行緒(或程序)中可以獲取某一個執行緒(程序)執行的狀態或者某一個任務執行的狀態及返回值。

通過submit返回的是一個future物件,它是一個未來可期的物件,通過它可以獲悉執行緒的狀態。

比較:

  • 1、執行緒不是越多越好,會涉及cpu上下文的切換(會把上一次的記錄儲存)。
  • 2、程序比執行緒消耗資源,程序相當於一個工廠,工廠裡有很多人,裡面的人共同享受著福利資源,,一個程序裡預設只有一個主執行緒,比如:開啟程式是程序,裡面執行的是執行緒,執行緒只是一個程序建立多個人同時去工作。
  • 3、執行緒裡有GIL全域性解鎖器:不允許cpu排程
  • 4、計算密度型適用於多程序
  • 5、執行緒:執行緒是計算機中工作的最小單元
  • 6、程序:預設有主執行緒 (幫工作)可以多執行緒共存
  • 7、協程:一個執行緒,一個程序做多個任務,使用程序中一個執行緒去做多個任務,微執行緒
  • 8、GIL全域性直譯器鎖:保證同一時刻只有一個執行緒被cpu排程

2、基本方法

  • submit(fn, *args, **kwargs):非同步提交任務
  • map(func, *iterables, timeout=None, chunksize=1):取代for迴圈submit的操作
  • shutdown(wait=True):相當於程序池的pool.close()+pool.join()操作
    wait=True,等待池內所有任務執行完畢回收完資源後才繼續 ,
    wait=False,立即返回,並不會等待池內的任務執行完畢 ,
    但不管wait引數為何值,整個程式都會等到所有任務執行完畢 ,submit和map必須在shutdown之前。
  • result(timeout=None):取得結果
  • add_done_callback(fn):回撥函數
  • done():判斷某一個執行緒是否完成
  • cancle():取消某個任務

3、ProcessPoolExecutor、ThreadPoolExecutor執行緒池

ThreadPoolExecutor構造範例的時候,傳入max_workers引數來設定執行緒中最多能同時執行的執行緒數目 。

使用submit函數來提交執行緒需要執行任務(函數名和引數)到執行緒池中,並返回該任務的控制程式碼(類似於檔案、畫圖),注意submit()不是阻塞的,而是立即返回。

通過submit函數返回的任務控制程式碼,能夠使用done()方法判斷該任務是否結束。

使用result()方法可以獲取任務的返回值,檢視內部程式碼,發現這個方法是阻塞的。

對於頻繁的cpu操作,由於GIL鎖的原因,多個執行緒只能用一個cpu,這時多程序的執行效率要比多執行緒高。

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(
future.result())

4、過wait()判斷執行緒執行的狀態:

wait方法可以讓主執行緒阻塞,直到滿足設定的要求。

wait(fs, timeout=None, return_when=ALL_COMPLETED),wait接受3個引數,

  • s表示執行的task序列;
  • timeout表示等待的最長時間,超過這個時間即使執行緒未執行完成也將返回;
  • return_when表示wait返回結果的條件,預設為ALL_COMPLETED全部執行完成再返回
import time
from concurrent.futures import (
    ThreadPoolExecutor, wait
)


def get_thread_time(times):
    time.sleep(times)
    return times


start = time.time()
executor = ThreadPoolExecutor(max_workers=4)
task_list = [executor.submit(get_thread_time, times) for times in [1, 2, 3, 4]]
i = 1
for task in task_list:
    print("task{}:{}".format(i, task))
    i += 1
print(wait(task_list, timeout=2.5))

# wait在2.5秒後返回執行緒的狀態,result:
# task1:<Future at 0x7ff3c885f208 state=running>
# task2:<Future at 0x7ff3c885fb00 state=running>
# task3:<Future at 0x7ff3c764b2b0 state=running>
# task4:<Future at 0x7ff3c764b9b0 state=running>
# DoneAndNotDoneFutures(
# done={<Future at 0x7ff3c885f208 state=finished returned int>, <Future at 0x7ff3c885fb00 state=finished returned int>},
# not_done={<Future at 0x7ff3c764b2b0 state=running>, <Future at 0x7ff3c764b9b0 state=running>})
#
# 可以看到在timeout 2.5時,task1和task2執行完畢,task3和task4仍在執行中

4、map的用法

map(fn, *iterables, timeout=None),第一個引數fn是執行緒執行的函數;第二個引數接受一個可迭代物件;第三個引數timeout跟wait()的timeout一樣,但由於map是返回執行緒執行的結果,如果timeout小於執行緒執行時間會拋異常TimeoutError。

map的返回是有序的,它會根據第二個引數的順序返回執行的結果:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12))

 #map取代了for+submit

5、s_completed返回執行緒執行結果

上面雖然提供了判斷任務是否結束的方法,但是不能在主執行緒中一直判斷,有時候我們是得知某個任務結束了,就去獲取結果,而不是一直判斷每個任務有沒有結束。這是就可以使用as_completed方法一次取出所有任務的結果。

import time
from collections import OrderedDict
from concurrent.futures import (
    ThreadPoolExecutor, as_completed
)


def get_thread_time(times):
    time.sleep(times)
    return times


start = time.time()
executor = ThreadPoolExecutor(max_workers=4)
task_list = [executor.submit(get_thread_time, times) for times in [2, 3, 1, 4]]
task_to_time = OrderedDict(zip(["task1", "task2", "task3", "task4"],[2, 3, 1, 4]))
task_map = OrderedDict(zip(task_list, ["task1", "task2", "task3", "task4"]))

for result in as_completed(task_list):
    task_name = task_map.get(result)
    print("{}:{}".format(task_name,task_to_time.get(task_name)))

# task3: 1
# task1: 2
# task2: 3
# task4: 4

task1、task2、task3、task4的等待時間分別為2s、3s、1s、4s,通過as_completed返回執行完的執行緒結果,as_completed(fs, timeout=None)接受2個引數,第一個是執行的執行緒列表,第二個引數timeout與map的timeout一樣,當timeout小於執行緒執行時間會拋異常TimeoutError。

通過執行結果可以看出,as_completed返回的順序是執行緒執行結束的順序,最先執行結束的執行緒最早返回。

6、回撥函數

Future物件也可以像協程一樣,當它設定完成結果時,就可以立即進行回撥別的函數。add_done_callback(fn),則表示 Futures 完成後,會調⽤fn函數。

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<程序%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<程序%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future物件obj,需要用obj.result()拿到結果

到此這篇關於Python執行緒操作模組(oncurrent)的文章就介紹到這了。希望對大家的學習有所幫助,也希望大家多多支援it145.com。


IT145.com E-mail:sddin#qq.com