首頁 > 軟體

Python實現實時增量資料載入工具的解決方案

2022-02-21 19:00:19

本次主要分享結合單例模式實際應用案例:實現實時增量資料載入工具的解決方案。最關鍵的是實現一個可進行新增、修改、刪除等操作的增量ID記錄表。

單例模式:提供全域性存取點,確保類有且只有一個特定型別的物件。通常用於以下場景:紀錄檔記錄或資料庫操作等,避免對用一資源請求衝突。

建立增量ID記錄表

import sqlite3
import datetime
import pymssql
import pandas as pd
import time
pd.set_option('expand_frame_repr', False)

匯入所需模組

 # 建立資料表
database_path = r'.DatabaseID_Record.db'
from sqlite3 import connect

with connect(database_path) as conn:
    conn.execute(
        'CREATE TABLE IF NOT EXISTS Incremental_data_max_id_record(id INTEGER PRIMARY KEY AUTOINCREMENT,F_SDaqID_MAX TEXT,record_date datetime)')

增量最新記錄ID-F_SDaqID_MAX資料庫儲存

#資料儲存到本地txt
def text_save(filename, record):#filename為寫入txt檔案的路徑,record為要寫入F_SDaqID_MAX、record_date資料列表.
    file = open(filename,'a') 追加方式
    # file = open(filename, 'w')  #覆蓋方式
    for i in range(len(record)):
        s = str(record[i]).replace('[','').replace(']','')
        s = s.replace("'",'').replace(',','') +'n'   #去除單引號,逗號,每行末尾追加換行符
        file.write(s)
    file.close()

增量最新記錄ID-F_SDaqID_MAX臨時檔案儲存

增量ID記錄提供了兩種實現方案 ,一個是資料持久化儲存模式,另一個是臨時檔案儲存模式。資料持久化模式顧名思義,也就是說在建立物件的時候,能將操作關鍵資訊如增量ID-F_SDaqID_MAX記錄下來,這種flag記錄對映是常選擇的設計模式。

資料庫連線類

實現實時增量資料獲取需要實現兩個資料庫連線類:增量資料ID儲存類和增量目標資料來源類。這裡利用單例模式實現資料庫操作類,將增量服務記錄資訊按照順序儲存到資料庫或特定的紀錄檔檔案中,以維護資料的一致性。

1、增量資料ID儲存sqlite連線類程式碼

class Database_sqlite(metaclass=MetaSingleton):
    database_path = r'.Databaseenergy_rc_configure.db'
    connection = None
    def connect(self):
        if self.connection is None:
            self.connection = sqlite3.connect(self.database_path,check_same_thread=False,isolation_level=None)
            self.cursorobj =  self.connection.cursor()
        return self.cursorobj,self.connection

    # 插入最大記錄
    @staticmethod
    def Insert_Max_ID_Record(f1, f2):

        cursor = Database_sqlite().connect()
        print(cursor)

        sql = f"""insert into Incremental_data_max_id_record(F_SDaqID_MAX,record_date) values("{f1}","{f2}")"""
        cursor[0].execute(sql)

        # sql = "insert  into Incremental_data_max_id_record(F_SDaqID_MAX,record_date) values(?,?)"
        # cursor[0].execute(sql,(f"{f1}",f"{f2}"))

        cursor[1].commit()
        print("插入成功!")
        # cursor[0].close()
        return 

    # 取出增量資料庫中最新一次ID記錄
    @staticmethod
    def View_Max_ID_Records():

        cursor = Database_sqlite().connect()
        sql = "select max(F_SDaqID_MAX) from Incremental_data_max_id_record"
        cursor[0].execute(sql)
        results = cursor[0].fetchone()[0]
        # #單例模式不用關閉資料庫連線
        # cursor[0].close()
        print("最新記錄ID", results)
        return results

    #刪除資料記錄ID
    @staticmethod
    def Del_Max_ID_Records():
        cursor = Database_sqlite().connect()
        sql = "delete from Incremental_data_max_id_record where record_date = (select MAX(record_date) from Incremental_data_max_id_record)"
        cursor[0].execute(sql)
        # results = cursor[0].fetchone()[0]
        # # cursor[0].close()
        cursor[1].commit()
        print("刪除成功")
        return

2、增量資料來源sqlserver連線類程式碼

class Database_sqlserver(metaclass=MetaSingleton):
    """
    #實時資料庫
    """
    connection = None

    # def connect(self):
    def __init__(self):
        if self.connection is None:
            self.connection = pymssql.connect(host="xxxxx",user="xxxxx",password="xxxxx",database="xxxxx",charset="utf8")
            if self.connection:
                print("連線成功!")
            # 開啟資料庫連線
            self.cursorobj = self.connection.cursor()
        # return self.cursorobj, self.connection

    # 獲取資料來源中最大ID
    @staticmethod
    def get_F_SDaqID_MAX():
        # cursor_insert = Database_sqlserver().connect()
        cursor_insert = Database_sqlserver().cursorobj

        sql_MAXID = """select MAX(F_SDaqID) from T_DaqDataForEnergy"""

        cursor_insert.execute(sql_MAXID)  # 執行查詢語句,選擇表中所有資料

        F_SDaqID_MAX = cursor_insert.fetchone()[0]  # 獲取記錄

        print("最大ID值:{0}".format(F_SDaqID_MAX))

        return F_SDaqID_MAX

    # 提取增量資料
    @staticmethod
    def get_incremental_data(incremental_Max_ID):
        # 開始獲取增量資料
        sql_incremental_data = """select F_ID,F_Datetime,F_Data from T_DaqDataForEnergy  where F_ID > {0}""".format(
            incremental_Max_ID)

        # cursor_find = Database_sqlserver().connect()
        cursor_find = Database_sqlserver().cursorobj

        cursor_find.execute(sql_incremental_data)  # 執行查詢語句,選擇表中所有資料

        Target_data_source = cursor_find.fetchall()  # 獲取所有資料記錄

        # cursor_find.close()
        cursor_find.close()

        df = pd.DataFrame(
            Target_data_source,
            columns=[
                "F_ID",
                "F_Datetime",
                "F_Data"])

        print("提取資料", df)
        return df

資料資源應用服務設計主要考慮資料庫操作的一致性和優化資料庫的各種操作,提高記憶體或CPU利用率。

實現多種讀取和寫入操作,使用者端操作呼叫API,執行相應的DB操作。

注:

1、使用metaclass實現建立具有單例特徵的類

Database_sqlserver(metaclass=MetaSingleton)

Database_sqlite(metaclass=MetaSingleton)

使用class定義新類時,資料庫類Database_sqlserver由MetaSingleton裝飾後即指定了metaclass,那麼MetaSingleton的特殊方法__call__方法將自動執行。

class MetaSingleton(type):
    _instances={}
    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(MetaSingleton,cls).__call__(*args,**kwargs)
        return cls._instances[cls]

以上程式碼基於元類的單例實現,當用戶端對資料庫執行某些操作時,會多次範例化資料庫類,但是隻建立一個物件,所以對資料庫的呼叫是同步的。

2、多執行緒使用同一資料庫連線資源需採取一定同步機制

如果沒采用同步機制,可能出現一些意料之外的情況

1)with cls.lock加鎖

class MetaSingleton(type):
    _instances={}
    lock = threading.Lock()
    def __call__(cls, *args, **kwargs):
        with cls.lock:
            if cls not in cls._instances:
                time.sleep(0.05)  #模擬耗時
                cls._instances[cls] = super(MetaSingleton,cls).__call__(*args,**kwargs)
            return cls._instances[cls]

鎖的建立和釋放需要消耗資源,上面程式碼每次建立都必須獲得鎖。

3、如果我們開發的程式非單個應用,而是叢集化的,即多個使用者端共用單個資料庫,導致資料庫操作無法同步,而資料庫連線池是更好的選擇。大大節省了記憶體,提高了伺服器地服務效率,能夠支援更多的客戶服務。

資料庫連線池的解決方案是在應用程式啟動時建立足夠的資料庫連線,並講這些連線組成一個連線池,由應用程式動態地對池中的連線進行申請、使用和釋放。對於多於連線池中連線數的並行請求,應該在請求佇列中排隊等待。

增量資料服務使用者端

增量處理策略:第一次載入先判斷增量資料表中是否存在最新記錄,若有直接載入;否則,記錄一下最大/最新的資料記錄ID或時間點,儲存到一個增量資料庫或記錄檔案中。

 從第二次載入開始只載入最大/最新的ID或時間點以後的資料。當載入過程全部成功完成之後並同步更新增量資料庫或記錄檔案,更新這次資料記錄的最後記錄ID或時間點。

一般這類資料記錄表有自增長列,那麼也可以使用自增長列來實現這個標識特徵。比如本次我用到資料表增長列F_ID。

class IncrementalRecordServer:
    _servers = []
    _instance = None
    def __new__(cls, *args, **kwargs):
        if not IncrementalRecordServer._instance:
            # IncrementalRecordServer._instance = super().__new__(cls)
            IncrementalRecordServer._instance = super(IncrementalRecordServer,cls).__new__(cls)
        return IncrementalRecordServer._instance

    def __init__(self,changeServersID=None):

        """
        變數初始化過程
        """
        self.F_SDaqID_MAX = Database_sqlserver().get_F_SDaqID_MAX()
        self.record_date = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        self.changeServersID = changeServersID

    # 回撥更新本地記錄,清空記錄替換,臨時記錄
    def record(func):
        def Server_record(self):
            v = func(self)
            text_save(filename=r"F:AutoOps_platformDatabaseServer_record.txt",record=IncrementalRecordServer._servers)
            print("儲存成功")

            return v
        return Server_record

    #增加服務記錄
    @record
    def addServer(self):
        self._servers.append([int(self.F_SDaqID_MAX),self.record_date])
        print("新增記錄")
        Database_sqlite.Insert_Max_ID_Record(f1=self.F_SDaqID_MAX, f2=self.record_date)

    #修改服務記錄
    @record
    def changeServers(self):
        # self._servers.pop()
        # 此處傳入手動修改的記錄ID
        self._servers.append([self.changeServersID,self.record_date])
        #先刪除再插入實現修改
        Database_sqlite.Del_Max_ID_Records()
        Database_sqlite.Insert_Max_ID_Record(f1=self.changeServersID, f2=self.record_date)
        print("更新記錄")

    #刪除服務記錄
    @record
    def popServers(self):
        # self._servers.pop()
        print("刪除記錄")
        Database_sqlite.Del_Max_ID_Records()

    # 最新服務記錄
    def getServers(self):
        # print(self._servers[-1])
        Max_ID_Records = Database_sqlite.View_Max_ID_Records()
        print("檢視記錄",Max_ID_Records)
        return Max_ID_Records

    #提取資料
    def Incremental_data_client(self):
        """
        # 提取資料(增量資料MAXID獲取,並提取增量資料)
        """
        # 實時資料庫
        # 第一次載入先判斷是否存在最新記錄
        if self.getServers() == None:
            # 插入增量資料庫ID
            self.addServer()
            # 提取增量資料
            data = Database_sqlserver.get_incremental_data(self.F_SDaqID_MAX)
            return data

        # 獲取增量資料庫中已有的最新最大ID記錄
        incremental_Max_ID = self.getServers()

        #新增記錄
        self.addServer()
        # 提取增量資料
        Target_data_source = Database_sqlserver.get_incremental_data(incremental_Max_ID)

        return Target_data_source

優化策略:

1、延遲載入方式

以上增量記錄服務類IncrementalRecordServer通過覆蓋__new__方法來控制物件的建立,我們在建立物件的時候會先檢查物件是否存在。也可以通過懶載入的方式實現,節約資源優化如下。

class IncrementalRecordServer:
    _servers = []
    _instance = None

    def __init__(self,changeServersID=None):
        """
        變數初始化過程
        """
        self.F_SDaqID_MAX = Database_sqlserver().get_F_SDaqID_MAX()
        self.record_date = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        self.changeServersID = changeServersID

        if not IncrementalRecordServer._instance:
            print("__init__物件建立")
        else:
            print("物件已經存在:",IncrementalRecordServer._instance)
            self.getInstance()

    @classmethod
    def getInstance(cls):
        if not cls._instance:
            cls._instance = IncrementalRecordServer()
        return cls._instance

懶漢式範例化能夠確保實際需要時才建立物件,範例化a= IncrementalRecordServer()時,呼叫初始化__init__方法,但是沒有新的物件建立。懶漢式這種方式載入類物件,也稱為延遲載入方式。

2、單例模式能有效利用空間資源,每次利用同一空間資源。

不同操作物件的記憶體地址相同,且不同物件初始化將上一個物件初始化變數覆蓋,確保最新記錄實時更新。表面上以上程式碼實現了單例模式沒問題,但多執行緒並行情況下,存線上程安全問題,可能同時建立不同的物件空間。考慮到執行緒安全,也可以進一步加鎖處理.

3、適用範圍及注意事項

本次程式碼適用於部署生產指定時間點執行之後產出的增量資料,長時間未啟用再啟動需要清空歷史記錄即增量資料庫或檔案ID需清空,一般實時資料增量實現一次載入沒有什麼問題,所以這一點也不用很關注(檔案方式程式碼可自行完善);當載入歷史資料庫或定時間隔產生資料量過大時,需要進一步修改程式碼,需要判斷資料規模,指定起始節點及載入資料量,綜合因素考慮,下次分享一下億級資料量提取方案。

4、進一步瞭解Python垃圾回收機制;並行情況下,通過優化執行緒池來管理資源。

最後可以新增一個函數來釋放資源

def __del__(self):
    class_name = self.__class__.__name__
    print(class_name,"銷燬")

del obj 呼叫__del__() 銷燬物件,釋放其空間;只有Python 物件在不再參照物件時被釋放。當程式中有其它變數參照該範例物件時,即便手動呼叫 __del__() 方法,該方法也不會立即執行。這和 Python 的垃圾回收機制的實現有關。

結果測試

if __name__ == '__main__':
    for i in range(6):
        hc1 = IncrementalRecordServer()
        hc1.addServer()
        print("Record_ID",hc1._servers[i])
        # del hc1
        time.sleep(60)

    #Server2-使用者端client
    # 最新服務記錄
    hc2 = IncrementalRecordServer()
    hc2.getServers()
    #檢視增量資料
    hc2.Incremental_data_client()

插入記錄

模擬每1分鐘插入一條記錄,向增量資料庫插入7條

if __name__ == '__main__':
    # Server3-使用者端client
    # 手動新增增量起始ID記錄
    hc3 = IncrementalRecordServer(changeServersID='346449980')
    hc3.changeServers()

if __name__ == '__main__':
    #刪除ID
    hc3 = IncrementalRecordServer(changeServersID='346449980')
    # hc3.changeServers()
    hc3.popServers()

以上就是Python實現實時增量資料載入工具的解決方案的詳細內容,更多關於Python增量資料載入的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com