首頁 > 軟體

Python3實時操作處理紀錄檔檔案的實現

2023-03-29 06:01:31

最近,需要對紀錄檔檔案進行實時資料處理。

一、簡單的實時檔案處理(單一檔案)

假設我們要實時讀取的紀錄檔的路徑為: /data/mongodb/shard1/log/pg.csv

那麼我們可以在python檔案中使用shell指令碼命令tail -F 進行實時讀取並操作

程式碼如下:

import re
import codecs
import subprocess
 
def pg_data_to_elk():
    p = subprocess.Popen('tail -F /data/mongodb/shard1/log/pg.csv', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,)    #起一個程序,執行shell命令
    while True:
        line = p.stdout.readline()   #實時獲取行
        if line:                     #如果行存在的話
            xxxxxxxxxxxx
            your operation

簡單解釋一下subprocess模組:

subprocess允許你生成新的程序,連線到它們的 input/output/error 管道,並獲取它們的返回(狀態)碼。

subprocess.Popen介紹

該類用於在一個新的程序中執行一個子程式。

subprocess.Popen的建構函式

class subprocess.Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None, 
    preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, universal_newlines=False,
    startup_info=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=())

引數說明:

  • args: 要執行的shell命令,可以是字串,也可以是命令各個引陣列成的序列。當該引數的值是一個字串時,該命令的解釋過程是與平臺相關的,因此通常建議將args引數作為一個序列傳遞。
  • stdin, stdout, stderr: 分別表示程式標準輸入、輸出、錯誤控制程式碼。
  • shell: 該引數用於標識是否使用shell作為要執行的程式,如果shell值為True,則建議將args引數作為一個字串傳遞而不要作為一個序列傳遞。

二、複雜的實時檔案處理(不斷產生新檔案)

如果紀錄檔會在滿足一定條件下產生新的紀錄檔檔案,比如log1.csv已經到了20M,那麼則會寫入log2.csv,這樣一天下來大概有1000多個檔案,且不斷產生新的,那麼如何進行實時獲取呢?

思路如下:

在實時監聽(tail -F)中加入當前檔案的大小判定,如果當前檔案大小大於20M,那麼跳出實時監聽,獲取新的紀錄檔檔案。(如果有其他判定條件也是這個思路,只不過把當前檔案大小的判定換成你所需要的判定)

程式碼如下:

import re
import os
import time
import codecs
import subprocess
from datetime import datetime
 
path = '/home/liao/python/csv'
time_now_day = datetime.now.strftime('%Y-%m-%d')
 
def get_file_size(new_file):
    fsize = os.path.getsize(new_file)
    fsize = fsize/float(1024*1024)
    return fsize
 
def get_the_new_file():
    files = os.listdir(path)
    files_list = list(filter(lambda x:x[-4:]=='.csv' and x[11:21]==time_now_day, files))
    files_list.sort(key=lambda fn:os.path.getmtime(path + '/' + fn) if not os.path.isdir(path + '/' + fn) else 0)
    new_file = os.path.join(path, files_list[-1])
    return new_file
 
def pg_data_to_elk():
    while True:
        new_file = get_the_new_file()
        p = subprocess.Popen('tail -F {0}'.format(new_file), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,)    #起一個程序,執行shell命令
        while True:
            line = p.stdout.readline()   #實時獲取行
            if line:                     #如果行存在的話
                if get_file_size(new_file) > 20:    #如果大於20M,則跳出迴圈
                    break
                xxxxxxxxxxxx
                your operation
        time.sleep(3)

到此這篇關於Python3實時操作處理紀錄檔檔案的實現的文章就介紹到這了,更多相關Python3實時操作紀錄檔檔案內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com