首頁 > 軟體

Python中的程序操作模組(multiprocess.process)

2022-05-30 18:01:46

一、multiprocess模組

multiprocess不是一個模組而是python中一個操作、管理程序的包。

子模組分為四個部分:

  • 建立程序部分(multiprocess.process)
  • 程序同步部分((multiprocess.Lock))
  • 程序池部分((multiprocess.Pool))
  • 程序之間資料共用(ThreadLocal、multiprocess.Queue、Pipes)

二、multiprocess.process模組

process模組是一個建立程序的模組,藉助這個模組,就可以完成程序的建立。

在windows中使用process模組的注意事項

在Windows作業系統中由於沒有fork(linux作業系統中建立程序的機制),在建立子程序的時候會自動 import 啟動它的這個檔案,而在 import 的時候又執行了整個檔案。

因此如果將process()直接寫在檔案中就會無限遞迴建立子程序報錯。所以必須把建立子程序的部分使用if __name__ =='__main__' 判斷保護起來,import 的時候,就不會遞迴執行了。

1、使用process模組建立程序

在一個python程序中開啟子程序,start方法和並行效果。

1 在Python中啟動的第一個子程序

import time
from multiprocessing import Process

def f(name):
    print('hello', name)
    time.sleep(1)
    print('我是子程序')


if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    # p.join()
    print('我是父程序')

2、 檢視主程序和子程序的程序號

import os
from multiprocessing import Process

def f(x):
    print('子程序id :',os.getpid(),'父程序id :',os.getppid())
    return x*x

if __name__ == '__main__':
    print('主程序id :', os.getpid())
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=(i,))
        p.start()

3、 進階,多個程序同時執行

注意,子程序的執行順序不是根據啟動順序決定的。

import time
from multiprocessing import Process


def f(name):
    print('hello', name)
    time.sleep(1)


if __name__ == '__main__':
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=('bob',))
        p.start()
        p_lst.append(p)
        p.join()
     # [p.join() for p in p_lst]
    print('父程序在執行')

4、 通過繼承Process類開啟程序

import os
from multiprocessing import Process


class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print(os.getpid())
        print('%s 正在和女主播聊天' %self.name)

p1=MyProcess('wupeiqi')
p2=MyProcess('yuanhao')
p3=MyProcess('nezha')

p1.start() # start會自動呼叫run
p2.start()
# p2.run()
p3.start()


p1.join()
p2.join()
p3.join()

print('主執行緒')

5、 程序之間的資料隔離問題

from multiprocessing import Process

def work():
    global n
    n=0
    print('子程序內: ',n)


if __name__ == '__main__':
    n = 100
    p=Process(target=work)
    p.start()
    print('主程序內: ',n)

2、守護行程daemon

會隨著主程序的結束而結束。

主程序建立守護行程

其一:守護行程會在主程序程式碼執行結束後就終止

其二:守護行程內無法再開啟子程序,否則丟擲異常:AssertionError: daemonic processes are not allowed to have children

注意:程序之間是互相獨立的,主程序程式碼執行結束,守護行程隨即終止。

1、 守護行程的啟動

import os
import time
from multiprocessing import Process

class Myprocess(Process):
    def __init__(self,person):
        super().__init__()
        self.person = person
    def run(self):
        print(os.getpid(),self.name)
        print('%s正在和女主播聊天' %self.person)


p=Myprocess('哪吒')
p.daemon=True # 一定要在p.start()前設定,設定p為守護行程,禁止p建立子程序,並且父程序程式碼執行結束,p即終止執行
p.start()
time.sleep(10)  # 在sleep時檢視程序id對應的程序ps -ef|grep id
print('主')

2、 主程序程式碼執行結束守護行程立即結束

from multiprocessing import Process

def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


p1=Process(target=foo)
p2=Process(target=bar)

p1.daemon=True
p1.start()
p2.start()
time.sleep(0.1)
print("main-------")  # 列印該行則主程序程式碼結束,則守護行程p1應該被終止.#可能會有p1任務執行的列印資訊123,因為主程序列印main----時,p1也執行了,但是隨即被終止,p2可以列印出來。

3、socket聊天並行範例

from socket import *
from multiprocessing import Process

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn,client_addr):
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__': # windows下start程序一定要寫到這下面
    while True:
        conn,client_addr=server.accept()
        p=Process(target=talk,args=(conn,client_addr))

4、程序物件的其他方法:terminate和is_alive

from multiprocessing import Process
import time
import random

class Myprocess(Process):
    def __init__(self,person):
        self.name=person
        super().__init__()

    def run(self):
        print('%s正在和網紅臉聊天' %self.name)
        time.sleep(random.randrange(1,5))
        print('%s還在和網紅臉聊天' %self.name)


p1=Myprocess('哪吒')
p1.start()

p1.terminate()#關閉程序,不會立即關閉,所以is_alive立刻檢視的結果可能還是存活
print(p1.is_alive()) #結果為True

print('開始')
print(p1.is_alive()) #結果為False

5、程序物件的其他屬性:pid和name

class Myprocess(Process):
    def __init__(self,person):
        self.name=person   # name屬性是Process中的屬性,標示程序的名字
        super().__init__()  # 執行父類別的初始化方法會覆蓋name屬性
        # self.name = person  # 在這裡設定就可以修改程序名字了
        # self.person = person  # 如果不想覆蓋程序名,就修改屬性名稱就可以了
    def run(self):
        print('%s正在和網紅臉聊天' %self.name)
        # print('%s正在和網紅臉聊天' %self.person)
        time.sleep(random.randrange(1,5))
        print('%s正在和網紅臉聊天' %self.name)
        # print('%s正在和網紅臉聊天' %self.person)


p1=Myprocess('哪吒')
p1.start()
print(p1.pid)    #可以檢視子程序的程序id

6、參考:

Process([group [, target [, name [, args [, kwargs]]]]]),由該類範例化得到的物件,表示一個子程序中的任務(尚未啟動)

強調:
1.需要使用關鍵字的方式來指定引數
2.args指定的為傳給target函數的位置引數,是一個元組形式,必須有逗號

引數介紹:
•group引數未使用,值始終為None
•target表示呼叫物件,即子程序要執行的任務
•args表示呼叫物件的位置引數元組,args=(1,2,'egon',)
•kwargs表示呼叫物件的字典,kwargs={'name':'egon','age':18}
•name為子程序的名稱

1、 方法介紹
•p.start():啟動程序,並呼叫該子程序中的p.run() 
•p.run():程序啟動時執行的方法,正是它去呼叫target指定的函數,我們自定義類的類中一定要實現該方法
•p.terminate():強制終止程序p,不會進行任何清理操作,如果p建立了子程序,該子程序就成了殭屍程序,使用該方法需要特別小心這種情況。如果p還儲存了一個鎖那麼也將不會被釋放,進而導致死鎖
•p.is_alive():如果p仍然執行,返回True
•p.join([timeout]):主執行緒等待p終止(強調:是主執行緒處於等的狀態,而p是處於執行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的程序,而不能join住run開啟的程序

2、 屬性介紹
•p.daemon:預設值為False,如果設為True,代表p為後臺執行的守護行程,當p的父程序終止時,p也隨之終止,並且設定為True後,p不能建立自己的新程序,必須在p.start()之前設定
•p.name:程序的名稱
•p.pid:程序的pid
•p.exitcode:程序在執行時為None、如果為–N,表示被訊號N結束(瞭解即可)
•p.authkey:程序的身份驗證鍵,預設是由os.urandom()隨機生成的32字元的字串。這個鍵的用途是為涉及網路連線的底層程序間通訊提供安全性,這類連線只有在具有相同的身份驗證鍵時才能成功(瞭解即可)

二、程序同步(multiprocess.Lock)

當多個程序使用同一份資料資源的時候,就會引發資料安全或順序混亂問題。

1、多程序搶佔輸出資源

import os
import time
import random
from multiprocessing import Process


def work(n):
    print('%s: %s is running' % (n, os.getpid()))
    time.sleep(random.random())
    print('%s:%s is done' % (n, os.getpid()))


if __name__ == '__main__':
    for i in range(3):
        p = Process(target=work, args=(i,))
        p.start()

# 0: 15620 is running
# 1: 19688 is running
# 2: 15892 is running
# 1:19688 is done
# 0:15620 is done
# 2:15892 is done

2、使用鎖維護執行順序

由並行變成了序列,犧牲了執行效率,但避免了競爭,確實會浪費了時間,卻保證了資料的安全。

import os
import time
import random
from multiprocessing import Process,Lock

def work(lock,n):
    lock.acquire()
    print('%s: %s is running' % (n, os.getpid()))
    time.sleep(random.random())
    print('%s: %s is done' % (n, os.getpid()))
    lock.release()

if __name__ == '__main__':
    lock=Lock()
    for i in range(3):
        p=Process(target=work,args=(lock,i))
        p.start()
        
# 1: 24776 is running
# 1: 24776 is done
# 0: 23588 is running
# 0: 23588 is done
# 2: 27308 is running
# 2: 27308 is done

3、多程序同時搶購餘票

# 檔案db的內容為:{"count":5}
# 注意一定要用雙引號,不然json無法識別
# 並行執行,效率高,但競爭寫同一檔案,資料寫入錯亂
from multiprocessing import Process, Lock
import time, json, random


def search():
    dic = json.load(open('db'))
    print('剩餘票數%s' % dic['count'])


def get():
    dic = json.load(open('db'))
    time.sleep(random.random())  # 模擬讀資料的網路延遲
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(random.random())  # 模擬寫資料的網路延遲
        json.dump(dic, open('db', 'w'))
        print('購票成功')
    else:
        print('購票失敗')


def task(lock):
    search()
    lock.acquire()
    get()
    lock.release()


if __name__ == '__main__':
    lock = Lock()
    for i in range(10):  # 模擬並行10個使用者端搶票
        p = Process(target=task, args=(lock,))
        p.start()

雖然可以用檔案共用資料實現程序間通訊,但問題是:

  • 效率低(共用資料基於檔案,而檔案是硬碟上的資料)
  • 需要自己加鎖處理

因此我們最好找尋一種解決方案能夠兼顧:

  • 效率高(多個程序共用一塊記憶體的資料)
  • 幫我們處理好鎖問題。這就是mutiprocessing模組為我們提供的基於訊息的IPC通訊機制:佇列和管道。

佇列和管道都是將資料存放於記憶體中,佇列又是基於(管道+鎖)實現的,可以讓我們從複雜的鎖問題中解脫出來,我們應該儘量避免使用共用資料,儘可能使用訊息傳遞和佇列,避免處理複雜的同步和鎖問題,而且在程序數目增多時,往往可以獲得更好的可獲展性。

三、程序間通訊IPC(Inter-Process Communication) (multiprocess.Queue)

1、 概念介紹——佇列multiprocess.Queue

建立共用的程序佇列,Queue是多程序安全的佇列,可以使用Queue實現多程序之間的資料傳遞。

Queue([maxsize])建立共用的程序佇列。
引數 :maxsize是佇列中允許的最大項數。如果省略此引數,則無大小限制。

底層佇列使用管道和鎖定實現。另外,還需要執行支援執行緒以便佇列中的資料傳輸到底層管道中。

2、 方法介紹

  • q.get( [ block [ ,timeout ] ] ):返回q中的一個專案。如果q為空,此方法將阻塞,直到佇列中有專案可用為止。block用於控制阻塞行為,預設為True. 如果設定為False,將引發Queue.Empty異常(定義在Queue模組中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有專案變為可用,將引發Queue.Empty異常。
  • q.get_nowait() :同q.get(False)方法。
  • q.put(item [, block [,timeout ] ] ) :將item放入佇列。如果佇列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,預設為True。如果設定為False,將引發Queue.Empty異常(定義在Queue庫模組中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發Queue.Full異常。
  • q.qsize() :返回佇列中目前專案的正確數量。此函數的結果並不可靠,因為在返回結果和在稍後程式中使用結果之間,佇列中可能新增或刪除了專案。在某些系統上,此方法可能引發NotImplementedError異常。
  • q.empty() :如果呼叫此方法時 q為空,返回True。如果其他程序或執行緒正在往佇列中新增專案,結果是不可靠的。也就是說,在返回和使用結果之間,佇列中可能已經加入新的專案。
  • q.full() :如果q已滿,返回為True. 由於執行緒的存在,結果也可能是不可靠的(參考q.empty()方法)。
  • q.close() :關閉佇列,防止佇列中加入更多資料。呼叫此方法時,後臺執行緒將繼續寫入那些已入佇列但尚未寫入的資料,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動呼叫此方法。關閉佇列不會在佇列使用者中生成任何型別的資料結束訊號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的佇列不會導致get()方法返回錯誤。
  • q.cancel_join_thread() :不會再程序退出時自動連線後臺執行緒。這可以防止join_thread()方法阻塞。
  • q.join_thread() :連線佇列的後臺執行緒。此方法用於在呼叫q.close()方法後,等待所有佇列項被消耗。預設情況下,此方法由不是q的原始建立者的所有程序呼叫。呼叫q.cancel_join_thread()方法可以禁止這種行為。

3、程式碼範例——multiprocess.Queue

1、 單看佇列用法

這個例子還沒有加入程序通訊,只是先來看看佇列為我們提供的方法,以及這些方法的使用和現象。

'''
multiprocessing模組支援程序間通訊的兩種主要形式:管道和佇列
都是基於訊息傳遞實現的,但是佇列介面
'''

from multiprocessing import Queue

q = Queue(3)

# put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(2)
q.put(1)
# q.put(3)   # 如果佇列已經滿了,程式就會停在這裡,等待資料被別人取走,再將資料放入佇列。
# 如果佇列中的資料一直不被取走,程式就會永遠停在這裡。
try:
    q.put_nowait(3)  # 可以使用put_nowait,如果佇列滿了不會阻塞,但是會因為佇列滿了而報錯。
except:  # 因此我們可以用一個try語句來處理這個錯誤。這樣程式不會一直阻塞下去,但是會丟掉這個訊息。
    print('佇列已經滿了')

# 因此,我們再放入資料之前,可以先看一下佇列的狀態,如果已經滿了,就不繼續put了。
print(q.full())  # True

print(q.get())  # 3
print(q.get())  # 2
print(q.get())  # 1
# print(q.get()) # 同put方法一樣,如果佇列已經空了,那麼繼續取就會出現阻塞。
try:
    q.get_nowait(3)  # 可以使用get_nowait,如果佇列滿了不會阻塞,但是會因為沒取到值而報錯。
except:  # 因此我們可以用一個try語句來處理這個錯誤。這樣程式不會一直阻塞下去。
    print('佇列已經空了')

print(q.empty())  # True,空了

2、 子程序傳送資料給父程序

一個queue的簡單應用,使用佇列q物件呼叫get函數來取得佇列中最先進入的資料。

import time
from multiprocessing import Process, Queue


def f(q):
    q.put([time.asctime(), 'from Eva', 'hello'])  # 呼叫主函數中p程序傳遞過來的程序引數 put函數為向佇列中新增一條資料。


if __name__ == '__main__':
    q = Queue()  # 建立一個Queue物件
    p = Process(target=f, args=(q,))  # 建立一個程序
    p.start()
    print(q.get())
    p.join()

# ['Mon Dec  9 18:27:08 2019', 'from Eva', 'hello']

3、 批次生產資料放入佇列再批次獲取結果

import os
import time
import multiprocessing


# 向queue中輸入資料的函數
def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.asctime())
    queue.put(info)


# 向queue中輸出資料的函數
def outputQ(queue):
    info= queue.get()
    print('%s%s%s' % (str(os.getpid()), '(get):', info))


# Main
if __name__ == '__main__':
    multiprocessing.freeze_support()
    record1 = []  # store input processes
    record2 = []  # store output processes
   
 queue = multiprocessing.Queue(3)

    # 輸入程序
    for i in range(10):
        process = multiprocessing.Process(target=inputQ, args=(queue,))
        process.start()
        record1.append(process)

    # 輸出程序
    for i in range(10):
        process = multiprocessing.Process(target=outputQ, args=(queue,))
        process.start()
        record2.append(process)

    for p in record1:
        p.join()

    for p in record2:
        p.join()

# 17568(get):3208(put):Mon Dec  9 18:29:17 2019
# 27620(get):24024(put):Mon Dec  9 18:29:17 2019
# 19780(get):21716(put):Mon Dec  9 18:29:17 2019
# 27576(get):27608(put):Mon Dec  9 18:29:17 2019
# 11304(get):10668(put):Mon Dec  9 18:29:18 2019
# 19732(get):20548(put):Mon Dec  9 18:29:18 2019
# 18120(get):25360(put):Mon Dec  9 18:29:18 2019
# 24752(get):21764(put):Mon Dec  9 18:29:18 2019
# 19848(get):7604(put):Mon Dec  9 18:29:18 2019
# 13888(get):10376(put):Mon Dec  9 18:29:18 2019

4、生產者消費者模型

在並行程式設計中使用生產者和消費者模式能夠解決絕大多數並行問題。該模式通過平衡生產執行緒和消費執行緒的工作能力來提高程式的整體處理資料的速度。

1、 為什麼要使用生產者和消費者模式

線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

2、 什麼是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

3、 基於佇列實現生產者消費者模型

import os
import random
import time
from multiprocessing import Process, Queue


def producer(q):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        res = '包子%s' % i
        q.put(res)
        print('%s 生產了 %s' % (os.getpid(), res))


def consumer(q):
    while True:
        res = q.get()
        time.sleep(random.randint(1, 3))
        print('%s 吃 %s' % (os.getpid(), res))


if __name__ == '__main__':
    q = Queue()
    # 生產者們:即廚師們
    p1 = Process(target=producer, args=(q,))

    # 消費者們:即吃貨們
    c1 = Process(target=consumer, args=(q,))

    # 開始
    p1.start()
    c1.start()
    print('主')

此時的問題是主程序永遠不會結束,原因是:生產者p在生產完後就結束了,但是消費者c在取空了q之後,則一直處於死迴圈中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢後,往佇列中再發一個結束訊號,這樣消費者在接收到結束訊號後就可以break出死迴圈。

4、 改良版——生產者消費者模型

注意:結束訊號None,不一定要由生產者發,主程序裡同樣可以發,但主程序需要等生產者結束後才應該傳送該訊號。

from multiprocessing import Process, Queue
import time, random, os


def producer(q):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        res = '包子%s' % i
        q.put(res)
        print('%s 生產了 %s' % (os.getpid(), res))
    q.put(None)# 傳送結束訊號
def consumer(q):
    while True:
        res = q.get()
        if res is None: break  # 收到結束訊號則結束
        time.sleep(random.randint(1, 3))
        print('%s 吃 %s' % (os.getpid(), res))


if __name__ == '__main__':
    q = Queue()
    # 生產者們:即廚師們
    p1 = Process(target=producer, args=(q,))

    # 消費者們:即吃貨們
    c1 = Process(target=consumer, args=(q,))

    # 開始
    p1.start()
    c1.start()
    print('主')

5、 主程序在生產者生產完畢後傳送結束訊號None

from multiprocessing import Process, Queue
import time, random, os


def producer(q):
    for i in range(2):
        time.sleep(random.randint(1, 3))
        res = '包子%s' % i
        q.put(res)
        print('%s 生產了 %s' % (os.getpid(), res))


def consumer(q):
    while True:
        res = q.get()
        if res is None: break  # 收到結束訊號則結束
        time.sleep(random.randint(1, 3))
        print('%s 吃 %s' % (os.getpid(), res))


if __name__ == '__main__':
    q = Queue()
    # 生產者們:即廚師們
    p1 = Process(target=producer, args=(q,))

    # 消費者們:即吃貨們
    c1 = Process(target=consumer, args=(q,))

    # 開始
    p1.start()
    c1.start()

    p1.join()
    q.put(None)# 傳送結束訊號
    print('主')

但上述解決方式,在有多個生產者和多個消費者時,我們則需要用一個很low的方式去解決

6、 多個消費者的例子:有幾個消費者就需要傳送幾次結束訊號

from multiprocessing import Process, Queue
import time, random, os


def producer(name, q):
    for i in range(2):
        time.sleep(random.randint(1, 3))
        res = '%s%s' % (name, i)
        q.put(res)
        print('%s 生產了 %s' % (os.getpid(), res))


def consumer(q):
    while True:
        res = q.get()
        if res is None: break  # 收到結束訊號則結束
        time.sleep(random.randint(1, 3))
        print('%s 吃 %s' % (os.getpid(), res))


if __name__ == '__main__':
    q = Queue()
    # 生產者們:即廚師們
    p1 = Process(target=producer, args=('包子', q))
    p2 = Process(target=producer, args=('骨頭', q))
    p3 = Process(target=producer, args=('泔水', q))

    # 消費者們:即吃貨們
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))

    # 開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()

    p1.join()  # 必須保證生產者全部生產完畢,才應該傳送結束訊號
    p2.join() 
    p3.join()
    q.put(None) # 有幾個消費者就應該傳送幾次結束訊號None 
    q.put(None) # 傳送結束訊號     
    print('主')

5、JoinableQueue([maxsize])可連線的共用程序佇列

建立可連線的共用程序佇列。這就像是一個Queue物件,但佇列允許專案的使用者通知生產者專案已經被成功處理。通知程序是使用共用的訊號和條件變數來實現的。

1、 方法介紹

JoinableQueue的範例p除了與Queue物件相同的方法之外,還具有以下方法:

  • q.task_done():使用者使用此方法發出訊號,表示q.get()返回的專案已經被處理。如果呼叫此方法的次數大於從佇列中刪除的專案數量,將引發ValueError異常。
  • q.join():生產者將使用此方法進行阻塞,直到佇列中所有專案均被處理。阻塞將持續到為佇列中的每個專案均呼叫q.task_done()方法為止。
    下面的例子說明如何建立永遠執行的程序,使用和處理佇列上的專案。生產者將專案放入佇列,並等待它們被處理。

2、 JoinableQueue佇列實現消費之生產者模型

from multiprocessing import Process, JoinableQueue
import time, random, os


def producer(name, q):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        res = '%s%s' % (name, i)
        q.put(res)
        print('%s 生產了 %s' % (os.getpid(), res))
    q.join()  # 生產完畢,使用此方法進行阻塞,直到佇列中所有專案均被處理。


def consumer(q):
    while True:
        res = q.get()
        time.sleep(random.randint(1, 3))
        print('%s 吃 %s' % (os.getpid(), res))
        q.task_done()  # 向q.join()傳送一次訊號,證明一個資料已經被取走了


if __name__ == '__main__':
    q = JoinableQueue()
    # 生產者們:即廚師們
    p1 = Process(target=producer, args=('包子', q))
    p2 = Process(target=producer, args=('骨頭', q))
    p3 = Process(target=producer, args=('泔水', q))

    # 消費者們:即吃貨們
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.daemon = True
    c2.daemon = True

    # 開始
    p_l = [p1, p2, p3, c1, c2]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()
    print('主')

    # 主程序等--->p1,p2,p3等---->c1,c2
    # p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到佇列的資料
    # 因而c1,c2也沒有存在的價值了,不需要繼續阻塞在程序中影響主程序了。應該隨著主程序的結束而結束,所以設定成守護行程就可以了。

四、程序池(multiprocess.Pool)

1、概念介紹——multiprocess.Pool

Pool([numprocess [,initializer [, initargs]]]):建立程序池

  • numprocess:要建立的程序數,如果省略,將預設使用cpu_count()的值
  • initializer:是每個工作程序啟動時要執行的可呼叫物件,預設為None
  • initargs:是要傳給initializer的引陣列

2、主要方法

  • p.apply(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。需要強調的是:此操作並不會在所有池工作程序中並執行func函數。如果要通過不同引數並行地執行func函數,必須從不同執行緒呼叫p.apply()函數或者使用p.apply_async()
  • p.apply_async(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。此方法的結果是AsyncResult類的範例,callback是可呼叫物件,接收輸入引數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。
  • p.close():關閉程序池,防止進一步操作。如果所有操作持續掛起,它們將在工作程序終止前完成
  • P.join():等待所有工作程序退出。此方法只能在close()teminate()之後呼叫
  • obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠端操作中引發了異常,它將在呼叫此方法時再次被引發。
  • obj.ready():如果呼叫完成,返回True
  • obj.successful():如果呼叫完成且沒有引發異常,返回True,如果在結果就緒之前呼叫此方法,引發異常
  • obj.wait([timeout]):等待結果變為可用。
  • obj.terminate():立即終止所有工作程序,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動呼叫此函數。

3、程式碼範例——multiprocess.Pool

1、 同步

import os,time
from multiprocessing import Pool

def work(n):
    print('%s run' %os.getpid())
    time.sleep(3)
    return n**2

if __name__ == '__main__':
    p=Pool(3)
 #程序池中從無到有建立三個程序,以後一直是這三個程序在執行任務
    res_l=[]
    for i in range(10):
        res=p.apply(work,args=(i,))
 # 同步呼叫,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞
                                    # 但不管該任務是否存在阻塞,同步呼叫都會在原地等著
    print(res_l)

2、 非同步

import os
import time
import random
from multiprocessing import Pool


def work(n):
    print('%s run' % os.getpid())
    time.sleep(random.random())
    return n ** 2


if __name__ == '__main__':
    p = Pool(3)  # 程序池中從無到有建立三個程序,以後一直是這三個程序在執行任務
    res_l = []
    for i in range(10):
        res= p.apply_async(work, args=(i,))
  # 非同步執行,根據程序池中有的程序數,每次最多3個子程序在非同步執行
        # 返回結果之後,將結果放入列表,歸還程序,之後再執行新的任務
        # 需要注意的是,程序池中的三個程序不會同時開啟或者同時結束
        # 而是執行完一個就釋放一個程序,這個程序就去接收新的任務。
        res_l.append(res)

    # 非同步apply_async用法:如果使用非同步提交的任務,主程序需要使用join,等待程序池內任務都處理完,然後可以用get收集結果
    # 否則,主程序結束,程序池可能還沒來得及執行,也就跟著一起結束了
    p.close()
    p.join()
    for res in res_l:
        print(res.get()
 )  # 使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get

4、程序池版socket並行聊天練習

1、 server

#Pool內的程序數預設是cpu核數,假設為4(檢視方法os.cpu_count())
#開啟6個使用者端,會發現2個使用者端處於等待狀態
#在每個程序內檢視pid,會發現pid使用為4個,即多個使用者端公用4個程序
from socket import *
from multiprocessing import Pool
import os

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn):
    print('程序pid: %s' %os.getpid())
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    p=Pool(4)
    while True:
        conn,*_=server.accept()
        p.apply_async(talk,args=(conn,))
        # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個使用者端能存取

2、 client

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

發現:並行開啟多個使用者端,伺服器端同一時間只有4個不同的pid,只能結束一個使用者端,另外一個使用者端才會進來。

5、回撥函數

需要回撥函數的場景:程序池中任何一個任務一旦處理完了,就立即告知主程序:我好了額,你可以處理我的結果了。主程序則呼叫一個函數去處理該結果,該函數即回撥函數

我們可以把耗時間(阻塞)的任務放到程序池中,然後指定回撥函數(主程序負責執行),這樣主程序在執行回撥函數時就省去了I/O的過程,直接拿到的是任務的結果。

1、 使用多程序請求多個url來減少網路等待浪費的時間

from multiprocessing import Pool
import requests
import json
import os


def get_page(url):
    print('<程序%s> get %s' % (os.getpid(), url))
    respone = requests.get(url)
    if respone.status_code == 200:
        return {'url': url, 'text': respone.text}


def pasrse_page(res):
    print('<程序%s> parse %s' % (os.getpid(), res['url']))
    parse_res = 'url:<%s> size:[%s]n' % (res['url'], len(res['text']))
    with open('db.txt', 'a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls = [
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p = Pool(3)
    res_l = []
    for url in urls:
        res =p.apply_async(get_page, args=(url,), callback=pasrse_page)
        res_l.append(res)

    p.close()
    p.join()
    print([res.get() for res in res_l])  # 拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經傳給回撥函數處理了

'''
列印結果:
<程序3388> get https://www.baidu.com
<程序3389> get https://www.python.org
<程序3390> get https://www.openstack.org
<程序3388> get https://help.github.com/
<程序3387> parse https://www.baidu.com
<程序3389> get http://www.sina.com.cn/
<程序3387> parse https://www.python.org
<程序3387> parse https://help.github.com/
<程序3387> parse http://www.sina.com.cn/
<程序3387> parse https://www.openstack.org
[{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>rn...',...}]
'''

2、 爬蟲範例

import re
from urllib.request import urlopen
from multiprocessing import Pool

def get_page(url,pattern):
    response=urlopen(url).read().decode('utf-8')
    return pattern,response

def parse_page(info):
    pattern,page_content=info
    res=re.findall(pattern,page_content)
    for item in res:
        dic={
            'index':item[0].strip(),
            'title':item[1].strip(),
            'actor':item[2].strip(),
            'time':item[3].strip(),
        }
        print(dic)
if __name__ == '__main__':
    regex = r'<dd>.*?<.*?class="board-index.*?>(d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'
    pattern1=re.compile(regex,re.S)

    url_dic={
        'http://maoyan.com/board/7':pattern1,
    }

    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        res=p.apply_async(get_page,args=(url,pattern),
         callback=parse_page) res_l.append(res) for i in res_l: i.get()

6、無需回撥函數

如果在主程序中等待程序池中所有任務都執行完畢後,再統一處理結果,則無需回撥函數。

from multiprocessing import Pool
import time,random,os

def work(n):
    time.sleep(1)
    return n**2
if __name__ == '__main__':
    p=Pool()

    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,))
        res_l.append(res)

    p.close()
    p.join() #等待程序池中所有程序執行完畢

    nums=[]
    for res in res_l:
        nums.append(res.get() ) #拿到所有結果
    print(nums) #主程序拿到所有的處理結果,可以在主程序中進行統一進行處理

程序池的其他實現方法:https://docs.python.org/dev/library/concurrent.futures.html

到此這篇關於Python程序操作模組的文章就介紹到這了。希望對大家的學習有所幫助,也希望大家多多支援it145.com。


IT145.com E-mail:sddin#qq.com