<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
程序是cpu資源分配的最小單元,一個程序中可以有多個執行緒。
執行緒是cpu計算的最小單元。
對於Python來說他的程序和執行緒和其他語言有差異,是有GIL鎖。
GIL鎖保證一個程序中同一時刻只有一個執行緒被cpu排程。
GIL鎖,全域性直譯器鎖。用於限制一個程序中同一時刻只有一個執行緒被cpu排程。
擴充套件:預設GIL鎖在執行100個cpu指令(過期時間)。
檢視GIL切換的指令個數
import sys v1 = sys。getcheckinterval() print(v1)
一、通過threading.Thread類建立執行緒
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('nick',)) t.start() print('主執行緒')
from threading import Thread import time class Sayhi(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): time.sleep(2) print('%s say hello' % self.name) if __name__ == '__main__': t = Sayhi('nick') t.start() print('主執行緒')
from threading import Thread from multiprocessing import Process import os def work(): print('hello',os.getpid()) if __name__ == '__main__': # part1:在主程序下開啟多個執行緒,每個執行緒都跟主程序的pid一樣 t1=Thread(target=work) t2=Thread(target=work) t1.start() t2.start() print('主執行緒/主程序pid',os.getpid()) # part2:開多個程序,每個程序都有不同的pid p1=Process(target=work) p2=Process(target=work) p1.start() p2.start() print('主執行緒/主程序pid',os.getpid())
from threading import Thread from multiprocessing import Process import os def work(): print('hello') if __name__ == '__main__': # 在主程序下開啟執行緒 t=thread(target=work) t.start() print('主執行緒/主程序') ''' 列印結果: hello 主執行緒/主程序 ''' # 在主程序下開啟子程序 t=Process(target=work) t.start() print('主執行緒/主程序') ''' 列印結果: 主執行緒/主程序 hello '''
from threading import Thread from multiprocessing import Process import os def work(): global n n=0 if __name__ == '__main__': # n=100 # p=Process(target=work) # p.start() # p.join() # print('主',n) # 毫無疑問子程序p已經將自己的全域性的n改成了0,但改的僅僅是它自己的,檢視父程序的n仍然為100 n=1 t=Thread(target=work) t.start() t.join() print('主',n) # 檢視結果為0,因為同一程序內的執行緒之間共用程序內的資料
Thread範例物件的方法:
isAlive()
:返回執行緒是否活動的。getName()
:返回執行緒名。setName()
:設定執行緒名。threading模組提供的一些方法:
threading.currentThread()
:返回當前的執行緒變數。threading.enumerate()
:返回一個包含正在執行的執行緒的list。正在執行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。threading.activeCount()
:返回正在執行的執行緒數量,與len(threading.enumerate())有相同的結果。from threading import Thread import threading from multiprocessing import Process import os def work(): import time time.sleep(3) print(threading.current_thread().getName()) if __name__ == '__main__': # 在主程序下開啟執行緒 t=Thread(target=work) t.start() print(threading.current_thread().getName()) print(threading.current_thread()) # 主執行緒 print(threading.enumerate()) # 連同主執行緒在內有兩個執行的執行緒 print(threading.active_count()) print('主執行緒/主程序') ''' 列印結果: MainThread <_MainThread(MainThread, started 140735268892672)> [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>] 主執行緒/主程序 Thread-1 '''
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('nick',)) t.start() t.join() print('主執行緒') print(t.is_alive()) ''' nick say hello 主執行緒 False '''
import multiprocessing import threading import socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.bind(('127.0.0.1',8080)) s.listen(5) def action(conn): while True: data=conn.recv(1024) print(data) conn.send(data.upper()) if __name__ == '__main__': while True: conn,addr=s.accept() p=threading.Thread(target=action,args=(conn,)) p.start()
無論是程序還是執行緒,都遵循:守護xx會等待主xx執行完畢後被銷燬。需要強調的是:執行完畢並非終止執行。
主程序在其程式碼結束後就已經算執行完畢了(守護行程在此時就被回收),然後主程序會一直等非守護的子程序都執行完畢後回收子程序的資源(否則會產生殭屍程序),才會結束。
主執行緒在其他非守護執行緒執行完畢後才算執行完畢(守護執行緒在此時就被回收)。因為主執行緒的結束意味著程序的結束,程序整體的資源都將被回收,而程序必須保證非守護執行緒都執行完畢後才能結束。
from threading import Thread import time def foo(): print(123) time.sleep(10) print("end123") def bar(): print(456) time.sleep(10) print("end456") t1 = Thread(target=foo) t2 = Thread(target=bar) t1.daemon= True #必須在t.start()之前設定 # t1.setDaemon(True) t1.start() t2.start() print("main-------") print(t1.is_alive()) # 123 # 456 # main------- # end456
from threading import Thread import os,time def work(): global n temp=n time.sleep(0.1) n=temp-1 if __name__ == '__main__': n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #結果可能為99
對公共資料的操作
import threading R=threading.Lock() R.acquire() ''' 對公共資料的操作 ''' R.release()
不加鎖:並行執行,速度快,資料不安全
from threading import current_thread,Thread,Lock import os,time def task(): global n print('%s is running' %current_thread().getName()) temp=n time.sleep(0.5) n=temp-1 if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:0.5216062068939209 n:99 '''
加鎖:未加鎖部分並行執行,加鎖部分序列執行,速度慢,資料安全
from threading import current_thread,Thread,Lock import os,time def task(): #未加鎖的程式碼並行執行 time.sleep(3) print('%s start to run' %current_thread().getName()) global n #加鎖的程式碼序列執行 lock.acquire() temp=n time.sleep(0.5) n=temp-1 lock.release() if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:53.294203758239746 n:0 '''
所謂死鎖:是指兩個或兩個以上的程序或執行緒在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的程序稱為死鎖程序,如下就是死鎖
from threading import Lock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
解決方法:遞迴鎖,在Python中為了支援在同一執行緒中多次請求同一資源,python提供了可重入鎖RLock。
這個RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖。
from threading import RLock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
遞迴鎖解決死鎖問題
import time from threading import Thread,RLock fork_lock = noodle_lock = RLock() def eat1(name): noodle_lock.acquire() print('%s 搶到了麵條'%name) fork_lock.acquire() print('%s 搶到了叉子'%name) print('%s 吃麵'%name) fork_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print('%s 搶到了叉子' % name) time.sleep(1) noodle_lock.acquire() print('%s 搶到了麵條' % name) print('%s 吃麵' % name) noodle_lock.release() fork_lock.release() for name in ['哪吒','nick','tank']: t1 = Thread(target=eat1,args=(name,)) t2 = Thread(target=eat2,args=(name,)) t1.start() t2.start()
queue佇列:使用import queue
,用法與程序Queue一樣
當必須在多個執行緒之間安全地交換資訊時,佇列線上程程式設計中特別有用。
通過雙向列表實現的
class queue.Queue(maxsize=0)
import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(先進先出): first second third '''
通過堆實現
class queue.LifoQueue(maxsize=0)
import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(後進先出): third second first '''
PriorityQueue類和LifoQueue類繼承Queue類然後重寫了_init、_qsize、_put、_get這四個類的私有方法.
通過list來實現的。
class queue.PriorityQueue(maxsize=0)
優先佇列的建構函式。maxsize是一個整數,它設定可以放置在佇列中的項數的上限。一旦達到此大小,插入將阻塞,直到佇列項被使用。如果maxsize小於或等於0,則佇列大小為無窮大。
import queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先順序(通常是數位,也可以是非數位之間的比較),數位越小優先順序越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) ''' 結果(數位越小優先順序越高,優先順序高的優先出隊): (10, 'b') (20, 'a') (30, 'c') '''
更多方法說明
官方檔案:https://docs.python.org/dev/library/concurrent.futures.html
concurrent.futures模組提供了高度封裝的非同步呼叫介面:
兩者都實現了由抽象Executor類定義的相同介面。
ThreadPoolExecutor(執行緒池)與ProcessPoolExecutor(程序池)都是concurrent.futures模組下的,主執行緒(或程序)中可以獲取某一個執行緒(程序)執行的狀態或者某一個任務執行的狀態及返回值。
通過submit返回的是一個future物件,它是一個未來可期的物件,通過它可以獲悉執行緒的狀態。
ThreadPoolExecutor構造範例的時候,傳入max_workers引數來設定執行緒中最多能同時執行的執行緒數目 。
使用submit函數來提交執行緒需要執行任務(函數名和引數)到執行緒池中,並返回該任務的控制程式碼(類似於檔案、畫圖),注意submit()不是阻塞的,而是立即返回。
通過submit函數返回的任務控制程式碼,能夠使用done()方法判斷該任務是否結束。
使用result()方法可以獲取任務的返回值,檢視內部程式碼,發現這個方法是阻塞的。
對於頻繁的cpu操作,由於GIL鎖的原因,多個執行緒只能用一個cpu,這時多程序的執行效率要比多執行緒高。
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) futures=[] for i in range(11): future=executor.submit(task,i) futures.append(future) executor.shutdown(True) print('+++>') for future in futures: print( future.result())
wait方法可以讓主執行緒阻塞,直到滿足設定的要求。
wait(fs, timeout=None, return_when=ALL_COMPLETED),wait接受3個引數,
import time from concurrent.futures import ( ThreadPoolExecutor, wait ) def get_thread_time(times): time.sleep(times) return times start = time.time() executor = ThreadPoolExecutor(max_workers=4) task_list = [executor.submit(get_thread_time, times) for times in [1, 2, 3, 4]] i = 1 for task in task_list: print("task{}:{}".format(i, task)) i += 1 print(wait(task_list, timeout=2.5)) # wait在2.5秒後返回執行緒的狀態,result: # task1:<Future at 0x7ff3c885f208 state=running> # task2:<Future at 0x7ff3c885fb00 state=running> # task3:<Future at 0x7ff3c764b2b0 state=running> # task4:<Future at 0x7ff3c764b9b0 state=running> # DoneAndNotDoneFutures( # done={<Future at 0x7ff3c885f208 state=finished returned int>, <Future at 0x7ff3c885fb00 state=finished returned int>}, # not_done={<Future at 0x7ff3c764b2b0 state=running>, <Future at 0x7ff3c764b9b0 state=running>}) # # 可以看到在timeout 2.5時,task1和task2執行完畢,task3和task4仍在執行中
map(fn, *iterables, timeout=None),第一個引數fn是執行緒執行的函數;第二個引數接受一個可迭代物件;第三個引數timeout跟wait()的timeout一樣,但由於map是返回執行緒執行的結果,如果timeout小於執行緒執行時間會拋異常TimeoutError。
map的返回是有序的,它會根據第二個引數的順序返回執行的結果:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(11): # future=executor.submit(task,i) executor.map(task,range(1,12)) #map取代了for+submit
上面雖然提供了判斷任務是否結束的方法,但是不能在主執行緒中一直判斷,有時候我們是得知某個任務結束了,就去獲取結果,而不是一直判斷每個任務有沒有結束。這是就可以使用as_completed方法一次取出所有任務的結果。
import time from collections import OrderedDict from concurrent.futures import ( ThreadPoolExecutor, as_completed ) def get_thread_time(times): time.sleep(times) return times start = time.time() executor = ThreadPoolExecutor(max_workers=4) task_list = [executor.submit(get_thread_time, times) for times in [2, 3, 1, 4]] task_to_time = OrderedDict(zip(["task1", "task2", "task3", "task4"],[2, 3, 1, 4])) task_map = OrderedDict(zip(task_list, ["task1", "task2", "task3", "task4"])) for result in as_completed(task_list): task_name = task_map.get(result) print("{}:{}".format(task_name,task_to_time.get(task_name))) # task3: 1 # task1: 2 # task2: 3 # task4: 4
task1、task2、task3、task4的等待時間分別為2s、3s、1s、4s,通過as_completed返回執行完的執行緒結果,as_completed(fs, timeout=None)接受2個引數,第一個是執行的執行緒列表,第二個引數timeout與map的timeout一樣,當timeout小於執行緒執行時間會拋異常TimeoutError。
通過執行結果可以看出,as_completed返回的順序是執行緒執行結束的順序,最先執行結束的執行緒最早返回。
Future物件也可以像協程一樣,當它設定完成結果時,就可以立即進行回撥別的函數。add_done_callback(fn),則表示 Futures 完成後,會調⽤fn函數。
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import requests import json import os def get_page(url): print('<程序%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def parse_page(res): res=res.result() print('<程序%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] # p=Pool(3) # for url in urls: # p.apply_async(get_page,args=(url,),callback=pasrse_page) # p.close() # p.join() p=ProcessPoolExecutor(3) for url in urls: p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future物件obj,需要用obj.result()拿到結果
到此這篇關於Python執行緒操作模組(oncurrent)的文章就介紹到這了。希望對大家的學習有所幫助,也希望大家多多支援it145.com。
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45