首頁 > 軟體

Python詳解複雜CSV檔案處理方法

2022-07-19 18:00:16

專案簡介

鑑於專案保密的需要,不便透露太多專案的資訊,因此,簡單介紹一下專案存在的難點:

  • 海量資料:專案是對CSV檔案中的資料進行處理,而特點是資料量大...真的大!!!拿到的第一個CSV範例檔案是110多萬行(小CASE),而第二個檔案就到了4500萬行,等到第三個檔案......好吧,一直沒見到第三個完整範例檔案,因為太大了,據說是第二個範例檔案的40多倍,大概二十億行......
  • 業務邏輯複雜:專案是需要對CSV檔案的每一行資料的各種組合可能性進行判斷,而判斷的業務邏輯較為複雜,如何在解決複雜邏輯的同時保證較高的處理效率是難點之一。

專案筆記與心得

1.分批次處理與多程序及多執行緒加速

  • 因為資料量太大,肯定是要分批對資料進行處理,否則,效率低不談,大概率也沒有足夠的記憶體能夠支撐,需要用到chunksize,此外,為了節約記憶體,以及提高處理效率,可以將文字類的資料儲存為“category”格式:
  • 專案整體是計算密集型的任務,因此,需要用到多程序,充分利用CPU的多核效能;
  • 多執行緒進行讀取與寫入,其中,寫入使用to_csv的增量寫入方法,mode引數設定為'a';
  • 多程序與多執行緒開啟一般為死迴圈,需要在合適的位置,放入結束迴圈的訊號,以便處理完畢後退出多程序或多執行緒
"""鑑於專案保密需要,以下程式碼僅為範例"""
import time
import pathlib as pl
import pandas as pd
from threading import Thread
from multiprocessing import Queue, Process, cpu_count
# 匯入多執行緒Thread,多程序的佇列Queue,多程序Process,CPU核數cpu_count
# 存放分段讀取的資料佇列,注:maxsize控制佇列的最大數量,避免一次性讀取到記憶體中的資料量太大
data_queue = Queue(maxsize=cpu_count() * 2)  
# 存放等待寫入磁碟的資料佇列
write_queue = Queue()  
def read_data(path: pl.Path, data_queue: Queue, size: int = 10000):
    """
    讀取資料放入佇列的方法
    :return:
    """
    data_obj = pd.read_csv(path, sep=',', header=0, chunksize=size, dtype='category')
    for idx, df in enumerate(data_obj):
        while data_queue.full():  # 如果佇列滿了,那就等待
            time.sleep(1)
        data_queue.put((idx + 1, df))
    data_queue.put((None, None))  # 放入結束訊號
def write_data(out_path: pl.Path, write_queue: Queue):
    """
    將資料增量寫入CSV的方法
    :return:
    """
    while True:
        while write_queue.empty():
            time.sleep(1)
        idx, df = write_queue.get()
        if df is None:
            return  # 結束退出
        df.to_csv(out_path, mode='a', header=None, index=False, encoding='ansi')  # 輸出CSV
def parse_data(data_queue: Queue, write_queue: Queue):
    """
    從佇列中取出資料,並加工的方法
    :return:
    """
    while True:
        while write_queue.empty():
            time.sleep(1)
        idx, df = data_queue.get()
        if df is None:  # 如果是空的結束訊號,則結束退出程序,
        # 特別注意結束前把結束訊號放回佇列,以便其他程序也能接收到結束訊號!!!
            data_queue.put((idx, df))
            return
        """處理資料的業務邏輯略過"""
        write_queue.put((idx, df))  # 將處理後的資料放入寫佇列
# 建立一個讀取資料的執行緒
read_pool = Thread(target=read_data, args=(read_data_queue, *args))
read_pool.start()  # 開啟讀取執行緒
# 建立一個增量寫入CSV資料的執行緒
write_pool = Thread(target=write_data, args=(write_data_queue, *args))
write_pool.start()  # 開啟寫程序
pools = []  # 存放解析程序的佇列
for i in range(cpu_count()):  # 迴圈開啟多程序,不確定開多少個程序合適的情況下,那麼按CPU的核數開比較合理
    pool = Process(target=parse_data, args=(read_data_queue, write_data_queue, *args))
    pool.start()  # 啟動程序
    pools.append(pool)  # 加入佇列
for pool in pools:
    pool.join()  # 等待所有解析程序完成
# 所有解析程序完成後,在寫佇列放入結束寫執行緒的訊號
write_data_queue.put((None, None))  
write_pool.join()  # 等待寫執行緒結束
print('任務完成')

2.優化演演算法提高效率

將類物件存入dataframe列

在嘗試了n種方案之後,最終使用了將類物件存到dataframe的列中,使用map方法,執行類方法,最後,將執行結果展開到多列中的方式。該方案本專案中取得了最佳的處理效率。

"""鑑於保密需要,以下程式碼僅為範例"""
class Obj:
    def __init__(self, ser: pd.Series):
        """
        初始化類物件
        :param ser: 傳入series
        """
        self.ser = ser  # 行資料
        self.attrs1 = []  # 屬性1
        self.attrs2 = []  # 屬性2
        self.attrs3 = []  # 屬性3
    def __repr__(self):
        """
        自定義輸出
        """
        attrs1 = '_'.join([str(a) for a in self.attrs1])
        attrs2 = '_'.join([str(a) for a in self.attrs2])
        attrs3 = '_'.join([str(a) for a in self.attrs3])
        return '_'.join([attrs1, attrs2, attrs3])
    def run(self):
        """執行業務邏輯"""
# 建立obj列,存入類物件
data['obj'] = data.apply(lambda x: Obj(x), axis=1)
# 執行obj列中的類方法獲得判斷結果
data['obj'] = data['obj'].map(lambda x: x.run())
# 鏈式呼叫,1將類物件文字化->2拆分到多列->3刪除空列->4轉換為category格式
data[['col1', 'col2', 'col3', ...省略]] = data['obj'].map(str).str.split('_', expand=True).dropna(axis=1).astype('category')
# 刪除obj列
data.drop(columns='obj', inplace=True)  

減少計算次數以提高執行效率

在整個優化過程中,對執行效率產生最大優化效果的有兩項:

  • 一是改變遍歷演演算法,採用直接對整行資料進行綜合判斷的方法,使原需要遍歷22個組合的計算與判斷大大減少
  • 二是提前計算特徵組合,製作成字典,後續直接查詢結果,而不再進行重複計算

使用numpy加速計算

numpy還是資料處理上的神器,使用numpy的方法,比自己實現的方法效率要高非常多,本專案中就用到了:bincount、argsort,argmax、flipud、in1d、all等,即提高了執行效率,又解決了邏輯判斷的問題:

"""numpy方法使用範例"""
import numpy as np
# 計算數位的個陣列合bincount
np.bincount([9, 2, 13, 12, 9, 10, 11])
# 輸出結果:array([0, 0, 1, 0, 0, 0, 0, 0, 0, 2, 1, 1, 1, 1], dtype=int64)
# 取得個數最多的數位argmax
np.argmax(np.bincount([9, 2, 13, 12, 9, 10, 11]))
# 輸出結果: 9
# 將數位按照個數優先,其次大小進行排序argsort
np.argsort(np.bincount([9, 2, 13, 12, 9, 10, 11]))
# 輸出結果:array([ 0,  1,  3,  4,  5,  6,  7,  8,  2, 10, 11, 12, 13,  9], dtype=int64)
# 翻轉列表flipud
np.flipud(np.argsort(np.bincount([9, 2, 13, 12, 9, 10, 11])))
# 輸出結果: array([ 9, 13, 12, 11, 10,  2,  8,  7,  6,  5,  4,  3,  1,  0], dtype=int64)
# 查詢相同值in1d
np.in1d([2, 3, 4], [2, 9, 3])
# 輸出結果: array([ True,  True, False]) 注:指2,3True,4False
np.all(np.in1d([2, 3], [2, 9, 3]))
# 輸出結果: array([ True,  True])
# 是否全是all
np.all(np.in1d([2, 3, 4], [2, 9, 3]))  # 判斷組合1是否包含在組合2中
# 輸出結果: False
np.all(np.in1d([2, 3], [2, 9, 3]))
# 輸出結果: True

優化前後的效率對比

總結

優化演演算法是在這個專案上時間花費最多的工作(沒有之一)。4月12日接單,10天左右出了第1稿,雖能執行,但回頭看存在兩個問題:一是有bug需要修正,二是執行效率不高(4500萬行資料,執行需要1小時21分鐘,如果只是在這個版本上debug需要增加判斷條件,效率只會更低);後20多天是在不斷的優化演演算法的同時對bug進行修正,最後版本執行相同資料只需要不足30分鐘,效率提高了一倍多。回顧來看,雖然調優花費的時間多,但是每一個嘗試不論成功還是失敗都是一次寶貴的經驗積累。

到此這篇關於Python詳解複雜CSV檔案處理方法的文章就介紹到這了,更多相關Python CSV檔案處理內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com