首頁 > 軟體

Python3的程序和執行緒你瞭解嗎

2022-03-15 13:02:53

1.概述

"""
基礎知識:
1.多工:作業系統可以同時執行多個任務;
2.單核CPU執行多工:作業系統輪流讓各個任務交替執行;
3.一個任務即一個程序(process),如:開啟一個瀏覽器,即啟動一個瀏覽器程序;
4.在一個程序內,要同時幹多件事,需要同時執行多個子任務,把程序內的子任務稱為"執行緒(Thread)";
5.每個程序至少做一件事,因此,一個程序至少有一個執行緒;
同時執行多執行緒的解決方案:
a.啟動多個程序,每個程序雖然只有一個執行緒,但多個程序可以一塊執行多個任務;
b.啟動一個程序,在一個程序內啟動多個執行緒,多個執行緒一塊執行多個任務;
c.啟動多個程序,每個程序啟動多個執行緒;
即多工的實現方式:
a.多程序模式;
b.多執行緒模式;
c.多程序+多執行緒模式;
"""

2.多程序

import os
print("Process (%s) start..." % os.getpid())
"""
只能在Linux/Unix/Mac上工作
pid = os.fork()
if pid == 0:
    print("I am child process (%s) and my parent is %s." % (os.getpid(), os.getppid()))
else:
    print("I (%s) just created a child process (%s)." % (os.getpid(), pid))
"""
print("Hello.")
# multiprocessing:跨平臺多執行緒模組
# process_test.py檔案,在互動下python process_test.py
from multiprocessing import Process
import os
def run_process(name):
    print("Run child process %s (%s)..." % (name, os.getpid()))
if __name__ == "__main__":
    print("Parent process %s." % os.getpid())
    p = Process(target = run_process, args = ("test",))
    print("Child process will start.")
    p.start()
    p.join()        # join()方法可以等待子程序結束後再繼續往下執行,用於程序間的同步
    print("Child process end.")

 # 結果輸出:
Parent process 28340.
Child process will start.
Run child process test (31152)...
Child process end.

# Pool:用程序池批次建立子程序
# process.py檔案,互動下python process.py
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

# 結果輸出:
Parent process 31576.
Waiting for all subprocesses done...
Run task 0 (20416)...
Run task 1 (15900)...
Run task 2 (24716)...
Run task 3 (31148)...
Task 2 runs 0.72 seconds.
Run task 4 (24716)...
Task 4 runs 1.03 seconds.
Task 3 runs 1.82 seconds.
Task 1 runs 2.73 seconds.
Task 0 runs 2.82 seconds.
All subprocesses done.

3.子程序

# subprocess模組:啟動一個子程序,控制其輸入和輸出
# subprocess_test.py檔案,注:檔名不要和模組名相同,否則報錯
import subprocess
print("$ nslookup www.python.org")
r = subprocess.call(["nslookup", "www.python.org"])
print("Exit code:", r)

 # 結果輸出:
$ nslookup www.python.org
伺服器:  cache-a.guangzhou.gd.cn
Address:  202.96.128.86
非權威應答:
名稱:    www.python.org
Addresses:  2a04:4e42:1a::223
          151.101.72.223
Exit code: 0

# 子程序需要輸入,通過communicate()方法
import subprocess
print("$ nslookup")
p = subprocess.Popen(["nslookup"], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
output, err = p.communicate(b"set q = mxnpython.orgnexitn")
print(output.decode("gbk"))
print("Exit code:", p.returncode)

# 結果輸出:
$ nslookup
預設伺服器:  cache-a.guangzhou.gd.cn
Address:  202.96.128.86
> Unrecognized command: set q = mx
> 伺服器:  cache-a.guangzhou.gd.cn
Address:  202.96.128.86
名稱:    python.org
Address:  138.197.63.241

Exit code: 0

4.程序間通訊

# 在父程序中建立兩個子程序,一個往Queue裡寫資料,一個從Queue裡讀資料
# queue_test.py檔案,互動下python queue_test.py
from multiprocessing import Process, Queue
import os, time, random
def write(q):
    print("Process to write:%s" % os.getpid())
    for value in ["W", "I", "L", "L", "A", "R", "D"]:
        print("Put %s to queue..." % value)
        q.put(value)
        time.sleep(random.random())
def read(q):
    print("Process to read:%s" % os.getpid())
    while True:
        value = q.get(True)
        print("Get %s from queue." % value)
if __name__ == "__main__":
    # 父程序建立Queue,並傳給各個子程序
    q = Queue()
    pw = Process(target = write, args = (q,))
    pr = Process(target = read, args = (q,))
    # 啟動子程序pw,寫入
    pw.start()
    # 啟動子程序pr,讀取
    pr.start()
    # 等待pw結束
    pw.join()
    # pr程序是死迴圈,無法等待其結束,需要強行終止
    pr.terminate()

# 結果輸出:
Process to write:15720
Process to read:21524
Put W to queue...
Get W from queue.
Put I to queue...
Get I from queue.
Put L to queue...
Get L from queue.
Put L to queue...
Get L from queue.
Put A to queue...
Get A from queue.
Put R to queue...
Get R from queue.
Put D to queue...
Get D from queue.

5.多執行緒

# 執行緒庫:_thread和threading
# 啟動一個執行緒:即把一個函數傳入並建立一個Thread範例,然後呼叫start()開始執行
# 任何程序預設啟動一個執行緒,該執行緒稱為主執行緒,主執行緒可以啟動新的執行緒
# current_thread()函數:返回當前執行緒的範例;
# 主執行緒範例名字:MainThread;
# 子執行緒名字的建立時指定,如果不指定,則自動給執行緒命名為Thread-1、Thread-2...
import time, threading
def loop():
    print("Thread %s is running..." % threading.current_thread().name)
    n = 0
    while n < 5:
        n = n + 1
        print("Thread %s >>> %s" % (threading.current_thread().name, n))
        time.sleep(1)
    print("Thread %s ended." % threading.current_thread().name)
print("Thread %s is running..." % threading.current_thread().name)
thread1 = threading.Thread(target = loop, name = "LoopThread")
thread1.start()
thread1.join()
print("Thread %s ended." % threading.current_thread().name)

# 結果輸出:
Thread MainThread is running...
Thread LoopThread is running...
Thread LoopThread >>> 1
Thread LoopThread >>> 2
Thread LoopThread >>> 3
Thread LoopThread >>> 4
Thread LoopThread >>> 5
Thread LoopThread ended.
Thread MainThread ended.

6.Lock

# 多程序:同一個變數,各自有一份拷貝存在於每個程序中,互不影響;
# 多執行緒:所有變數由所有執行緒共用,任何一個變數可以被任何一個執行緒修改;
# 多執行緒同時操作一個變數
# 多執行幾次,發現結果不為0
import time, threading
balance = 0
def change_it(n):
    global balance
    balance = balance + n
    balance = balance - n
def run_thread(n):
    # 執行緒交替執行,balance結果不一定為0
    for i in range(2000000):
        change_it(n)
thread1 = threading.Thread(target = run_thread, args = (5,))
thread2 = threading.Thread(target = run_thread, args = (8,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(balance)
# 結果輸出:
# 5(各自不同)
# 確保balance計算正確,需要給change_it()上一把鎖
# 當執行緒開始執行change_it()時,該執行緒獲得鎖,其他執行緒不能同時執行change_it(),
# 只能等待,直到鎖被釋放,獲得該鎖後才能改;
# 通過threading.Lock()建立鎖
import time, threading
balance = 0
lock = threading.Lock()
def change_it(n):
    global balance
    balance = balance + n
    balance = balance - n
def run_thread(n):
    for i in range(2000000):
        lock.acquire()
        try:
            change_it(n)
        finally:
            # 釋放鎖
            lock.release()
thread1 = threading.Thread(target = run_thread, args = (5,))
thread2 = threading.Thread(target = run_thread, args = (8,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(balance)
# 結果輸出:
# 0

7.ThreadLocal

# 多執行緒環境下,每個執行緒有自己的資料;
# 一個執行緒使用自己的區域性變數比使用全域性變數好;
import threading
# 建立全域性ThreadLocal物件
local_school = threading.local()
def process_student():
    # 獲取當前執行緒關聯的student
    std = local_school.student
    print("Hello,%s (in %s)" % (std, threading.current_thread().name))
def process_thread(name):
    # 繫結ThreadLocal的student
    local_school.student = name
    process_student()
thread1 = threading.Thread(target = process_thread, args = ("Willard",), name = "Thread-1")
thread2 = threading.Thread(target = process_thread, args = ("WenYu",), name = "Thread-2")
thread1.start()
thread2.start()
thread1.join()
thread2.join()

# 結果輸出:
# Hello,Willard (in Thread-1)
# Hello,WenYu (in Thread-2)

8.程序VS執行緒

# 程序和執行緒優缺點:
# 1.要實現多工,會設計Master-Worker模式,Master負責分配任務,Worker負責執行任務,
# 在多工環境下,通常是一個Master,多個Worker;
#     a.如果使用多程序實現Master-Worker,主程序即Master,其他程序即Worker;
#     b.如果使用多執行緒實現Master-Worker,主執行緒即Master,其他執行緒即Worker;
# 2.多程序優點:穩定性高,一個子程序崩潰不會影響主程序和其他子程序;
# 3.多程序缺點:建立程序的代價大,作業系統能同時執行的程序數有限;
# 4.多執行緒缺點:任何一個執行緒崩潰,可能直接造成整個程序崩潰;
# 執行緒切換:
# 1.依次完成任務的方式稱為單任務模型,或批次處理任務模型;
# 2.任務1先做n分鐘,切換到任務2做n分鐘,再切換到任務3做n分鐘,依此類推,稱為多工模型;
# 計算密集型 VS IO密集型
# 1.計算密集型任務:要進行大量的計算,消耗CPU資源,如:對視訊進行高清解碼等;
# 2.IO密集型任務:涉及到網路、磁碟IO的任務,均為IO密集型任務;
# 3.IO密集型任務消耗CPU少,大部分時間在等待IO操作完成;
# 非同步IO
# 1.事件驅動模型:用單程序單執行緒模型來執行多工;
# 2.Python語言中,單執行緒的非同步程式設計模型稱為協程;

9.分散式程序

"""
範例:
有一個通過Queue通訊的多程序程式在同一機器上執行,但現在處理任務的程序任務繁重,
希望把傳送任務的程序和處理任務的程序釋出到兩臺機器上;
"""
# task_master_test.py
# 互動環境中:python task_master_test.py
import random, time, queue
from multiprocessing.managers import BaseManager
# 傳送任務的佇列
task_queue = queue.Queue()
# 接收結果的佇列
result_queue = queue.Queue()
def return_task_queue():
    global task_queue
    return task_queue
def return_result_queue():
    global task_queue
    return task_queue
# 從BaseManager繼承的QueueManager
class QueueManager(BaseManager):
    pass
if __name__ == "__main__":
    # 把兩個Queue註冊到網路上,callable引數關聯Queue物件
    QueueManager.register("get_task_queue", callable = return_task_queue)
    QueueManager.register("get_result_queue", callable = return_result_queue)
    # 繫結埠5000,設定驗證碼"Willard"
    manager = QueueManager(address = ("127.0.0.1", 5000), authkey = b"Willard")
    # 啟動Queue
    manager.start()
    # 獲得通過網路存取的Queue物件
    task = manager.get_task_queue()
    result = manager.get_result_queue()
    # 放任務進去
    for i in range(10):
        n = random.randint(0, 10000)
        print("Put task %d..." % n)
        task.put(n)
    # 從result佇列讀取結果
    print("Try get results...")
    for i in range(10):
        r = result.get(timeout = 10)
        print("Result:%s" % r)
    # 關閉
    manager.shutdown()
    print("Master Exit.")
# task_worker_test.py檔案
# 互動環境python task_worker_test.py
import time, sys, queue
from multiprocessing.managers import BaseManager
# 建立QueueManager
class QueueManager(BaseManager):
    pass
QueueManager.register("get_task_queue")
QueueManager.register("get_result_queue")
# 連線到伺服器
server_address = "127.0.0.1"
print("Connect to server %s..." % server_address)
# 埠和驗證碼
m = QueueManager(address = (server_address, 5000), authkey = b"Willard")
# 網路連線
m.connect()
# 獲取Queue物件
task = m.get_task_queue()
result = m.get_result_queue()
# 從task佇列取任務,把結果寫入result佇列
for i in range(10):
    try:
        n = task.get(timeout = 1)
        print("Run task %d * %d..." % (n, n))
        r = "%d * %d = %d" % (n, n, n * n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print("Task queue is empty.")
print("Worker Exit.")

總結

本篇文章就到這裡了,希望能夠給你帶來幫助,也希望您能夠多多關注it145.com的更多內容!     


IT145.com E-mail:sddin#qq.com