首頁 > 軟體

Python讀取檔案的四種方式的範例詳解

2022-03-25 16:00:59

故事背景:最近在處理Wikipedia的資料時發現由於資料量過大,之前的檔案讀取和資料處理方法幾乎不可用,或耗時非常久。今天學校安排統一核酸檢查,剛好和檔案讀取的過程非常相似。正好藉此機會和大家一起從頭梳理一下幾種檔案讀取方法。

故事設定:現在學校要求對所有同學進行核酸採集,每位同學先在宿舍內等候防護人員(以下簡稱“大白”)叫號,叫到自己時去停車場排隊等候大白對自己進行採集,採集完之後的樣本由大白統一有序收集並儲存。

名詞解釋:

  • 學生:所有的學生是一個大檔案,每個學生是其中的一行資料
  • 宿舍:硬碟
  • 停車場:記憶體
  • 核酸採集:資料處理
  • 樣本:處理後的資料
  • 大白:程式

學生數量特別少的情況

當學生數量特別少時,可以考慮將所有學生統一叫到停車場等候,再依次進行核酸採集。

方法一:簡單情況

此時的程式可以模擬為:

import time
from typing import List
 
 
def pick_all_students(dorm: str) -> List[str]:
    with open(dorm, "rt", encoding="utf8") as fin:
        students = fin.readlines()
        return students
 
 
def pick_sample(student: str) -> str:
    time.sleep(0.01)
    sample = f"{student.strip()}'s sample"
    return sample
 
 
def process(dorm: str, sample_storeroom: str) -> None:
    with open(sample_storeroom, "wt", encoding="utf8") as fout:
        students = pick_all_students(dorm)
        for student in students:
            sample = pick_sample(student)
            fout.write(f"{sample}n")
            fout.flush()
 
 
if __name__ == "__main__":
    process(
        "student_names.txt",
        "sample_storeroom.txt"
    )

注意,在第19行中,大白一次性把所有同學都叫到了停車場中。這種做法在學生比較少時做起來很快,但是如果學生特別多,停車場裝不下怎麼辦?

停車場空間不夠時怎麼辦?

方法二:邊讀邊處理

一般來說,由於停車場空間有限,我們不會採用一次性把所有學生都叫到停車場中,而是會一個一個地處理,這樣可以節約記憶體空間。

import time
from typing import Iterator
 
 
def pick_one_student(dorm: str) -> Iterator[str]:
    with open(dorm, "rt", encoding="utf8") as fin:
        for student in fin:
            yield student
 
 
def pick_sample(student: str) -> str:
    time.sleep(0.01)
    sample = f"{student.strip()}'s sample"
    return sample
 
 
def process(dorm: str, sample_storeroom: str) -> None:
    with open(sample_storeroom, "wt", encoding="utf8") as fout:
        for student in pick_one_student(dorm):
            sample = pick_sample(student)
            fout.write(f"{sample}n")
            fout.flush()
 
 
if __name__ == "__main__":
    process(
        "student_names.txt",
        "sample_storeroom.txt"
    )

這裡pick_one_student函數中的返回值是用yield返回的,一次只會返回一名同學。

不過,這種做法雖然確保了停車場不會滿員,但是這種做法在人數特別多的時候就不再適合了。雖然可以保證完成任務,但由於每次只能採集一個同學,程式的執行並不高。特別是當你的CPU有多個核時,會浪費機器效能,出現一核有難,其它圍觀的現象。

怎麼加快執行效率?

大家可能也已經注意到了,剛剛我們的場景中,不論採用哪種方法,都只有一名大白在工作。那我們能不能加派人手,從而提高效率呢?

答案當然是可行的。我們現在先考慮增加兩名大白,使得一名大白專注於叫號,安排學生進入停車場,另外一名大白專注於採集核酸,最後一名大白用於儲存核酸樣本。

方法三

import time
from multiprocessing import Queue, Process
from typing import Iterator
 
 
def pick_student(stu_queue: Queue, dorm: str) -> Iterator[str]:
    print("pick_student: started")
 
    picked_num = 0
    with open(dorm, "rt", encoding="utf8") as fin:
        for student in fin:
            stu_queue.put(student)
            picked_num += 1
            if picked_num % 500 == 0:
                print(f"pick_student: {picked_num}")
 
    # end signal
    stu_queue.put(None)
    print("pick_student: finished")
 
 
def pick_sample(student: str) -> str:
    time.sleep(0.01)
    sample = f"{student.strip()}'s sample"
    return sample
 
 
def process(stu_queue: Queue, store_queue: Queue) -> None:
    print("process: started")
 
    process_num = 0
    while True:
        student = stu_queue.get()
        if student is not None:
            sample = pick_sample(student)
            store_queue.put(sample)
            process_num += 1
            if process_num % 500 == 0:
                print(f"process: {process_num}")
        else:
            break
 
    # end signal
    store_queue.put(None)
    print("process: finished")
 
 
def store_sample(store_queue: Queue, sample_storeroom: str) -> None:
    print("store_sample: started")
 
    store_num = 0
    with open(sample_storeroom, "wt", encoding="utf8") as fout:
        while True:
            sample = store_queue.get()
            if sample is not None:
                fout.write(f"{sample}n")
                fout.flush()
 
                store_num += 1
                if store_num % 500 == 0:
                    print(f"store_sample: {store_num}")
            else:
                break
 
    print("store_sample: finished")
 
 
if __name__ == "__main__":
    dorm = "student_names.txt"
    sample_storeroom = "sample_storeroom.txt"
 
    stu_queue = Queue()
    store_queue = Queue()
 
    store_p = Process(target=store_sample, args=(store_queue, sample_storeroom), daemon=True)
    store_p.start()
    process_p = Process(target=process, args=(stu_queue, store_queue), daemon=True)
    process_p.start()
    read_p = Process(target=pick_student, args=(stu_queue, dorm), daemon=True)
    read_p.start()
 
    store_p.join()

這份程式碼中,我們引入了多程序的思路,將每個大白看作一個程序,並使用了佇列Queue作為程序間通訊的媒介。stu_queue表示學生叫號進停車場的佇列,store_queue表示已經採集過的待儲存核酸樣本的佇列。

此外,為了控制程序的停止,我們在pick_student和 process函數的最後都向各自佇列中新增了None作為結束標誌符。

假設有1w名學生(student_names.txt檔案有1w行),經過測試後發現上述方法的時間如下:

  • 方法一:1m40.716s
  • 方法二:1m40.717s
  • 方法三:1m41.097s

咦?不是做了分工嗎?怎麼速度還變慢了?經筆者觀察,這是因為叫號的大白速度太快了(檔案讀取速度快)通常是TA已經齊活了,另外倆人還在吭哧吭哧幹活呢,體現不出來分工的優勢。如果這個時候我們對法二和法三的叫號做延時操作,每個學生叫號之後停滯10ms再叫下一位學生,則方法三的處理時間幾乎不變,而方法二的時間則會延長至3m21.345s。

怎麼加快處理速度?

上面提到,大白採核酸的時間較長,往往上一個人的核酸還沒采完,下一個人就已經在後面等著了。我們能不能提高核酸採集這個動作(資料處理)的速度呢?其實一名大白執行一次核酸採集的時間我們幾乎無法再縮短了,但是我們可以通過增加人手的方式,來達到這個目的。就像去銀行辦業務,如果開放的視窗越多,那麼每個人等待的時間就會越短。這裡我們也採取類似的策略,增加核酸採集的視窗。

import time
from multiprocessing import Queue, Process, cpu_count
from typing import Iterator
 
 
def pick_student(stu_queue: Queue, dorm: str, num_workers: int) -> Iterator[str]:
    print("pick_student: started")
 
    picked_num = 0
    with open(dorm, "rt", encoding="utf8") as fin:
        for student in fin:
            stu_queue.put(student)
            picked_num += 1
            if picked_num % 500 == 0:
                print(f"pick_student: {picked_num}")
 
    # end signal
    for _ in range(num_workers):
        stu_queue.put(None)
 
    print("pick_student: finished")
 
 
def pick_sample(student: str) -> str:
    time.sleep(0.01)
    sample = f"{student.strip()}'s sample"
    return sample
 
 
def process(stu_queue: Queue, store_queue: Queue) -> None:
    print("process: started")
 
    process_num = 0
    while True:
        student = stu_queue.get()
        if student is not None:
            sample = pick_sample(student)
            store_queue.put(sample)
            process_num += 1
            if process_num % 500 == 0:
                print(f"process: {process_num}")
        else:
            break
 
    print("process: finished")
 
 
def store_sample(store_queue: Queue, sample_storeroom: str) -> None:
    print("store_sample: started")
 
    store_num = 0
    with open(sample_storeroom, "wt", encoding="utf8") as fout:
        while True:
            sample = store_queue.get()
            if sample is not None:
                fout.write(f"{sample}n")
                fout.flush()
 
                store_num += 1
                if store_num % 500 == 0:
                    print(f"store_sample: {store_num}")
            else:
                break
 
    print("store_sample: finished")
 
 
if __name__ == "__main__":
    dorm = "student_names.txt"
    sample_storeroom = "sample_storeroom.txt"
    num_process = max(1, cpu_count() - 1)
 
    maxsize = 10 * num_process
    stu_queue = Queue(maxsize=maxsize)
    store_queue = Queue(maxsize=maxsize)
 
    store_p = Process(target=store_sample, args=(store_queue, sample_storeroom), daemon=True)
    store_p.start()
    process_workers = []
    for _ in range(num_process):
        process_p = Process(target=process, args=(stu_queue, store_queue), daemon=True)
        process_p.start()
        process_workers.append(process_p)
    read_p = Process(target=pick_student, args=(stu_queue, dorm, num_process), daemon=True)
    read_p.start()
 
    for worker in process_workers:
        worker.join()
 
    # end signal
    store_queue.put(None)
    store_p.join()

總耗時 0m4.160s !我們來具體看看其中的細節部分:

首先我們將CPU核數 - 3作為採核酸的大白數量。這裡減3是為其它工作程序保留了一些資源,你也可以根據自己的具體情況做調整

這次我們在 Queue中增加了 maxsize引數,這個引數是限制佇列的最大長度,這個引數通常與你的實際記憶體情況有關。如果資料特別多時要考慮做些調整。這裡我採用10倍的工作程序數目作為佇列的長度

注意這裡pick_student函數中要為每個後續的工作程序都新增一個結束標誌,因此最後會有個for迴圈

我們把之前放在process函數中的結束標誌提取出來,放在了最外側,使得所有工作程序均結束之後再關閉最後的store_p程序

結語

總結來說,如果你的資料集特別小,用法一;通常情況下用法二;資料集特別大時用法四。

以上就是Python讀取檔案的四種方式的範例詳解的詳細內容,更多關於Python讀取檔案的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com