首頁 > 軟體

Python執行時修改業務SQL程式碼

2022-06-27 18:01:32

前記

在專案的演變過程中,有時可能會誕生一些需要奇怪的臨時需求,這些需求會涉及到所有的SQL,但開發時間上卻不允許整個專案的所有SQL進行重寫,比如控制不同的人存取表的許可權,或者是我面對的SASS化需求,這時就需要在執行時根據對應的條件來修改SQL語句。

1.緣起

最近專案在準備搞SASS化,SASS化有一個特點就是多租戶,且每個租戶之間的資料都要隔離,對於資料庫的隔離方案常見的有資料庫隔離,表隔離,欄位隔離,目前我只用到表隔離和欄位隔離(資料庫隔離的原理也是差不多)。 對於欄位隔離比較簡單,就是查詢條件不同而已,比如像下面的SQL查詢:

SELECT * FROM t_demo WHERE tenant_id='xxx' AND is_del=0

但是為了嚴謹,需求上需要在執行SQL之前檢查對應的表是否帶上tenant_id的查詢欄位。

對於表隔離就麻煩了一些,他需要做到在執行的時候根據對應的租戶ID來處理某個資料表,舉個例子,假如有下面這樣的一條SQL查詢:

SELECT * FROM t_demo WHERE is_del=0

在遇到租戶A時,SQL查詢將變為:

SELECT * FROM t_demo_a WHERE is_del=0

在遇到租戶B時,SQL查詢將變為:

SELECT * FROM t_demo_b WHERE is_del=0

如果商戶數量固定時,一般在程式碼裡編寫if-else來判斷就可以了,但是常見的SASS化應用的商戶是會一直新增的,那麼對於這個SQL邏輯就會變成這樣:

def sql_handle(tenant_id: str):
    table_name: str = f"t_demo_{tenant_id}"
    sql: str = f"SELECT * FROM {table_name} WHERE is_del=0"

但是這有幾個問題,對於ORM來說,一開始只建立一個t_demo對應的表物件就可以了,現在卻要根據多個商戶建立多個表物件,這是不現實的,其次如果是裸寫SQL,一般會使用IDE的檢查,而對於這樣的SQL:

sql: str = f"SELECT * FROM {table_name} WHERE is_del=0"

IDE是沒辦法進行檢查的,當然還有一個最為嚴重的問題,就是當前的專案已經非常龐大了,如果每個相關表的呼叫都進行適配更改的話,那工程量就非常龐大了,所以最好的方案就是在引擎庫得到使用者傳過來的SQL語句後且還沒傳送到MySQL伺服器之前自動的根據商戶ID更改SQL, 而要達到這樣的效果,就必須侵入到我們使用的MySQL的引擎庫,修改裡面的方法來相容我們的需求。

不管是使用dbutils還是sqlalchemy,都可以指定一個引擎庫,目前常用的引擎庫是pymysql,所以下文都將以pymysql為例進行闡述。

2.侵入庫

由於必須侵入到我們使用的引擎庫,所以我們應該先判斷我們需要修改引擎庫的哪個方法,在經過原始碼閱讀後,我判定只要更改pymysql.cursors.Cursormogrify方法:

def mogrify(self, query, args=None):
    """
    Returns the exact string that is sent to the database by calling the
    execute() method.

    This method follows the extension to the DB API 2.0 followed by Psycopg.
    """
    conn = self._get_db()

    if args is not None:
        query = query % self._escape_args(args, conn)
    return query

這個方法的作用就是把使用者傳過來的SQL和引數進行整合,生成一個最終的SQL,剛好符合我們的需求,於是可以通過繼承的思路來建立一個新的屬於我們自己的Cursor類:

import pymysql
class Cursor(pymysql.cursors.Cursor):
    def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str:
        # 在此可以編寫處理還合成的SQL邏輯
        mogrify_sql: str = super().mogrify(query, args)
        # 在此可以編寫處理合成後的SQL邏輯
        return mogrify_sql
class DictCursor(pymysql.cursors.DictCursorMixin, Cursor):
    """A cursor which returns results as a dictionary"""
    # 直接修改Cursor類的`mogrify`方法並不會影響到`DictCursor`類,所以我們也要建立一個新的`Cursor`類。

建立好了Cursor類後,就需要考慮如何在pymysql中應用我們自定義的Cursor類了,一般的Mysql連線庫都支援我們傳入自定義的Cursor類,比如pymysql:

import pymysql.cursors
# Connect to the database
connection = pymysql.connect(
    host='localhost',
    user='user',
    password='passwd',
    database='db',
    charset='utf8mb4',
    cursorclass=pymysql.cursors.DictCursor
)

我們可以通過cursorclass來指定我們的Cursor類,如果使用的庫不支援或者是其它原因則需要使用猴子修補程式的方法,具體的使用方法見Python探針完成呼叫庫的資料提取

3.獲取商戶ID

現在我們已經搞定了在何處修改SQL的問題了,接下來就要思考如何在mogrify方法獲取到商戶ID以及那些表要進行替換,一般我們在進行一段程式碼呼叫時,有兩種傳引數的方法, 一種是傳陣列型別的引數:

with conn.cursor() as cursor:
    cursor.execute("SELECT * FROM t_demo WHERE is_del=%s", (0, ))

一種是傳字典型別的引數:

with conn.cursor() as cursor:
    cursor.execute("SELECT * FROM t_demo WHERE is_del=%(is_del)s", {"is_del": 0})

目前大多數的專案都存在這兩種型別的編寫習慣,而引擎庫在執行execute時會經過處理後才把引數sqlargs傳給了mogrify,如果我們是使用字典型別的引數,那麼可以在裡面嵌入我們需要的引數,並在mogrify裡面提取出來,但是使用了陣列型別的引數或者是ORM庫的話就比較難傳遞引數給mogrify方法了,這時可以通過context隱式的把引數傳給mogrify方法,具體的分析和原理可見:python如何使用contextvars模組原始碼分析

context的使用方法很簡單, 首先是建立一個context封裝的類:

from contextvars import ContextVar, Token
from typing import Any, Dict, Optional, Set
context: ContextVar[Dict[str, Any]] = ContextVar("context", default={})
class Context(object):
    """基礎的context呼叫,支援Type Hints檢查"""
    tenant_id: str
    replace_table_set: Set[str]
    def __getattr__(self, key: str) -> Any:
        value: Any = context.get().get(key)
        return value
    def __setattr__(self, key: str, value: Any) -> None:
        context.get()[key] = value
class WithContext(Context):
    """簡單的處理reset token邏輯,和context管理,只用在業務程式碼"""
    def __init__(self) -> None:
        self._token: Optional[Token] = None

    def __enter__(self) -> "WithContext":
        self._token = context.set({})
        return self
    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        if self._token:
            context.reset(self._token)
            self._token = None

接下來在業務程式碼中,通過context傳入當前業務對應的引數:

with WithContext as context:
    context.tenant_id = "xxx"
    context.replace_table_set = {"t_demo"}
    with conn.cursor() as cursor:
        cursor.execute("SELECT * FROM t_demo WHERE is_del=%s", (0, ))

然後在mogrify中通過呼叫context即可獲得對應的引數了:

import pymysql
class Cursor(pymysql.cursors.Cursor):
    def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str:
        tenant_id: str = context.tenant_id
        replace_table_set: Set[str] = context.replace_table_set
        # 在此可以編寫處理還合成的SQL邏輯
        mogrify_sql: str = super().mogrify(query, args)
        # 在此可以編寫處理合成後的SQL邏輯
        return mogrify_sql

4.修改SQL

現在,萬事俱備,只剩下修改SQL的邏輯,之前在做別的專案的時候,建的表都是十分的規範,它們是以t_xxx的格式給表命名,這樣一來替換表名十分方便,只要進行兩次替換就可以相容大多數情況了,程式碼如下:

import pymysql
class Cursor(pymysql.cursors.Cursor):
    def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str:
        tenant_id: str = context.tenant_id
        replace_table_set: Set[str] = context.replace_table_set
        # 簡單範例,實際上正則的效率會更好
        for replace_table in replace_table_set:
            if replace_table in query:
                # 替換表名
                query = query.replace(f" {replace_table} ", f" {replace_table}_{tenant_id} ")
                # 替換查詢條件中帶有表名的
                query = query.replace(f" {replace_table}.", f" {replace_table}_{tenant_id}.")
        mogrify_sql: str = super().mogrify(query, args)
        # 在此可以編寫處理合成後的SQL邏輯
        return mogrify_sql

但是現在專案的SQL規範並不是很好,有些表名還是MySQL的關鍵字,所以靠簡單的替換是行不通的,同時這個需求中,一些表只需要欄位隔離,需要確保有帶上對應的欄位查詢,這就意味著必須有一個庫可以來解析SQL,並返回一些資料使我們可以比較方便的知道SQL中哪些是表名,哪些是查詢欄位了。

目前在Python中有一個比較知名的SQL解析庫--sqlparse,它可以通過解析引擎把SQL解析成一個Python物件,之後我們就可以通過一些語法來判斷哪些是SQL關鍵字, 哪些是表名,哪些是查詢條件等等。但是這個庫只實現一些底層的API,我們需要對他和SQL比較瞭解之後才能實現一些比較完備的功能,比如下面3種常見的SQL:

SELECT * FROM t_demo
SELECT * FROM t_demo as demo
SELECT * FROM t_other as other LEFT JOIN t_demo demo on demo.xxx==other.xxx

如果我們要通過sqlparse來提取表名的話就需要處理這3種情況,而我們如果要每一個情況都編寫出來的話,那將會非常費心費力,同時也可能存在遺漏的情況,這時就需要用到另外一個庫--sql_metadata,這個庫是基於sqlparse和正則的解析庫,同時提供了大量的常見使用方法的封裝,我們通過直接呼叫對應的函數就能知道SQL中有哪些表名,查詢欄位是什麼了。

目前已知這個庫有一個缺陷,就是會自動去掉欄位的符號, 比如表名為關鍵字時,我們需要使用`符號把它包起來:

SELECT * FROM `case`

但在經過sql_metadata解析後得到的表名是case而不是`case`,需要人為的處理,但是我並不覺得這是一個BUG,自己不按規範建立表,能怪誰呢。

接下來就可以通過sql_metadata的方法來實現我需要的功能了,在根據需求修改後,程式碼長這樣(說明見註釋):

from typing import Dict, Set, Tuple, Union
import pymysql
import sql_metadata
class Cursor(pymysql.cursors.Cursor):
    def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str:
        tenant_id: str = context.tenant_id
        # 生成一個解析完成的SQL物件
        sql_parse: sql_metadata.Parser = sql_metadata.Parser(query)

        # 新加的一個屬性,這裡存下需要校驗查詢條件的表名
        check_flag = False 
        where_table_set: Set[str] = context.where_table_set
        # 該方法會獲取到SQL對應的table,返回的是一個table的陣列
        for table_name in sql_parse.tables:
            if table_name in where_table_set:
                if sql_parse.columns_dict:
                    # 該方法會返回SQL對應的欄位,其中分為select, join, where等,這裡只用到了where
                    for where_column in sql_parse.columns_dict.get("where", []):
                        # 如果連表,裡面存的是類似於t_demo.tenant_id,所以要相容這一個情況
                        if "tenant_id" in where_column.lower().split("."):
                            check_flag = True
                            break
        if not check_flag:
            # 檢查不通過就拋錯
            raise RuntimeError()
        # 更換表名的邏輯
        replace_table_set: Set[str] = context.replace_table_set
        new_query: str = query
        for table_name in sql_parse.tables:
            if table_name in replace_table_set:
                new_query = ""
                # tokens存放著解析完的資料,比如SELECT * FROM t_demo解析後是
                # [SELECT, *, FROM, t_demo]四個token
                for token in sql_parse.tokens:
                    # 判斷token是否是表名  
                    if token.is_potential_table_name:
                        # 提取規範的表名
                        parse_table_name: str = token.stringified_token.strip()
                        if parse_table_name in replace_table_set:
                            new_table_name: str = f" {parse_table_name}_{tenant_id}"
                            # next_token代表SQL的下一個欄位
                            if token.next_token.normalized != "AS":
                                # 如果當前表沒有設定別名
                                # 通過AS把替換前的表名設定為新表名的別名,這樣一來後面的表名即使沒進行更改,也是能讀到對應商戶ID的表
                                new_table_name += f" AS {parse_table_name}"
                            query += new_table_name
                            continue
                    # 通過stringified_token獲取的資料會自動帶空格,比如`FROM`得到的會是` FROM`,這樣拼接的時候就不用考慮是否加空格了
                    new_query += token.stringified_token
        mogrify_sql: str = super().mogrify(new_query, args)
        # 在此可以編寫處理合成後的SQL邏輯
        return mogrify_sql

這份程式碼十分簡單,它只做簡單介紹,事實上這段邏輯會應用到所有的SQL查詢中,我們應該要保證這段程式碼是沒問題的,同時不要有太多的效能浪費,所以在使用的時候要考慮到程式碼拆分和優化。 比如在使用的過程中可以發現,我們的SQL轉換和檢查都是在父類別的Cursor.mogrify之前進行的,這就意味著不管我們程式碼邏輯裡cursor.execute傳的引數是什麼,對於同一個程式碼邏輯來說,傳過來的query值是保持不變的,比如下面的程式碼:

def get_user_info(uid: str) -> Dict[str, Any]:
    with conn.cursor() as cursor:
        cursor.execute("SELECT * FROM t_user WHERE uid=%(uid)s", {"uid": uid})
        return cursor.fetchone() or {}

這段程式碼中傳到Cursor.mogrify的query永遠為SELECT * FROM t_user WHERE uid=%(uid)s,有變化的只是args中uid的不同。 有了這樣的一個前提條件,那麼我們就可以把query的校驗結果和轉換結果快取下來,減少每次都需要解析SQL再校驗造成的效能浪費。至於如何實現快取則需要根據自己的專案來決定,比如專案中只有幾百個SQL執行,那麼直接用Pythondict來存放就可以了,如果專案中執行的SQL很多,同時有些執行的頻率非常的高,有些執行的頻率非常的低,那麼可以考慮使用LRU來快取。

到此這篇關於Python執行時修改業務SQL程式碼的文章就介紹到這了,更多相關Python 修改程式碼內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com