首頁 > 軟體

pydantic-resolve巢狀資料結構生成LoaderDepend管理contextvars

2023-04-08 06:00:47

pydantic-resolve 解決巢狀資料結構的生成和其他方案的比較

pydantic-resolve

和GraphQL相比

  • GraphQL的優勢是 1.方便構建巢狀結構,2.client可以方便生成查詢子集。非常適合構建滿足靈活變化的 public API的場景.
  • 但是很多實際業務在前端做的其實是照單全收,並沒有靈活選擇的需要。GraphQL帶來的便利更多體現在靈活地構建巢狀結構。
  • GraphQL需要client端維護查詢語句,相較於通過openapi.json和工具自動生成client讓前後端無縫對接的做法,在前後端一體的架構中維護這些查詢語句,屬於重複勞動。
  • 為了滿足許可權控制的需要,通過RESTful定義一個個API 會比全域性一個Query,Mutation 控制起來更加清晰直接。
  • Pydantic-resolve 恰好滿足了靈活構建巢狀結構的需求,它不需要像GraphQL一樣引入一系列概念和設定,它非常輕量級,沒有任何侵入,所有的功能通過簡單resolve一下就實現。
  • Pydantic-resolve 在保持輕量級的同時,可以隱藏 Dataloader 的初始化邏輯,避免了GraphQL中在多處維護dataloader的麻煩。
  • Pydantic-resolve 還提供了對 global loader filters 的支援,在一些業務邏輯下可以簡化很多程式碼。如果把Dataloader 的 keys 等價視為 relationship的 join on 條件的話, 那麼 loader_filters 就類似在別處的其他過濾條件。

結論:

GraphQL更適合 public API。

前後端作為一個整體的專案,RESTful + Pydantic-resolve 才是快速靈活提供資料結構的最佳方法。

和 ORM 的 relationship相比

  • relationship 提供了ORM 級別的巢狀查詢實現,但預設會使用lazy select的方法, 會導致很多的查詢次數, 並且在非同步使用的時候需要手動宣告例如 .option(subquery(Model.field)) 之類的程式碼
  • relationship 的外來鍵決定了,無法在關聯查詢的時候提供額外的過濾條件 (即便可以也是改動成本比較大的做法)
  • relationship 最大的問題是使得 ORM Model 和 schema 產生了程式碼耦合。在schema層想做的巢狀查詢,會把邏輯侵入到ORM Model層。
  • Pydantic-resolve 則沒有這樣的問題,在 ORM 層不需要定義任何relationship,所有的join邏輯都通過 dataloader 批次查詢解決。 並且通過 global loader_filters 引數,可以提供額外的全域性過濾條件。

結論

relationship 方案的靈活度低,不方便修改,預設的用法會產生外來鍵約束。對迭代頻繁的專案不友好。

Pydantic-resolve 和 ORM 層完全解耦,可以通過靈活建立Dataloader 來滿足各種需要。

LoaderDepend的用途 背景

如果你使用過dataloader, 不論是js還是python的,都會遇到一個問題,如何為單獨的一個請求建立獨立的dataloader?

以 python 的 strawberry 來舉例子:

@strawberry.type
class User:
    id: strawberry.ID
async def load_users(keys) -> List[User]:
    return [User(id=key) for key in keys]
loader = DataLoader(load_fn=load_users)
@strawberry.type
class Query:
    @strawberry.field
    async def get_user(self, id: strawberry.ID) -> User:
        return await loader.load(id)
schema = strawberry.Schema(query=Query)

如果單獨範例化的話,會導致所有的請求都使用同一個dataloader, 由於loader本身是有快取優化機制的,所以即使內容更新之後,依然會返回快取的歷史資料。

因此 strawberry 的處理方式是:

@strawberry.type
class User:
    id: strawberry.ID
async def load_users(keys) -> List[User]:
    return [User(id=key) for key in keys]
class MyGraphQL(GraphQL):
    async def get_context(
        self, request: Union[Request, WebSocket], response: Optional[Response]
    ) -> Any:
        return {"user_loader": DataLoader(load_fn=load_users)}
@strawberry.type
class Query:
    @strawberry.field
    async def get_user(self, info: Info, id: strawberry.ID) -> User:
        return await info.context["user_loader"].load(id)
schema = strawberry.Schema(query=Query)
app = MyGraphQL(schema)

開發者需要在get_context中去初始化loader, 然後框架會負責在每次request的時候會執行初始化。 這樣每個請求就會有獨立的loader, 解決了多次請求被快取的問題。

其中的原理是:contextvars 在 await 的時候會做一次淺拷貝,所以外層的context可以被內部讀到,因此手動在最外層(request的時候) 初始化一個參照型別(dict)之後,那麼在 request 內部自然就能獲取到參照型別內的loader。

這個方法雖然好,但存在兩個問題:

  • 需要手動去維護 get_context, 每當新增了一個 DataLoader, 就需要去裡面新增, 而且實際執行 .load 的地方也要從context 裡面取loader。
  • 存在初始化了loaders卻沒有被使用到的情況,比如整個Query 有 N 個loader,但是使用者的查詢實際只用到了1個,那麼其他loader 的初始化就浪費了。而且作為公共區域東西多了之後程式碼維護會不清晰。(重要)

graphene 就更加任性了,把loader 的活交給了 aiodataloader, 如果翻閱檔案的話,會發現處理的思路也是類似的,只是需要手動去維護建立過程。

解決方法

我所期望的功能是:

  • 初始化按需執行,比如我的整個schema 裡面只存在 DataLoaderA, 那我希望只有DataLoaderA 被範例化
  • 不希望在某個reqeust或者 middleware中幹手動維護初始化。

其實這兩件事情說的是同一個問題,就是如何把初始化的事情依賴反轉到 resolve_field 方法中。

具體轉化為程式碼:

class CommentSchema(BaseModel):
    id: int
    task_id: int
    content: str
    feedbacks: List[FeedbackSchema]  = []
    def resolve_feedbacks(self, loader=LoaderDepend(FeedbackLoader)):
        return loader.load(self.id)
class TaskSchema(BaseModel):
    id: int
    name: str
    comments: List[CommentSchema]  = []
    def resolve_comments(self, loader=LoaderDepend(CommentLoader)):
        return loader.load(self.id)

就是說,我只要這樣申明好loader,其他的事情就一律不用操心。那麼,這做得到麼?

得益於pydantic-resolve 存在一個手動執行resolve的過程,於是有一個思路:

  • contextvar 是淺拷貝,所以存的如果是參照型別,那麼在最外層定義的dict,可以被所有內層讀到。可以在Resolver初始化的時候定義。
  • 假如 tasks: list[TaskSchema] 有n個,我希望在第一次遇到的時候把loader 初始化並快取,後續其他都使用快取的loader。
  • LoaderDepend 裡面存放的是 DataLoader類,做為default 引數傳入resolve_field 方法
  • 執行resolve_field之前,利用inspect.signature 分析 default 引數,執行初始化和快取的邏輯。

總體就是一個lazy的路子,到實際執行的時候去處理初始化流程。

下圖中 1 會執行LoaderA 初始化,2,3則是讀取快取, 1.1 會執行LoaderB初始化,2.1,3.1 讀取快取

程式碼如下:

class Resolver:
    def __init__(self):
        self.ctx = contextvars.ContextVar('pydantic_resolve_internal_context', default={})
    def exec_method(self, method):
        signature = inspect.signature(method)
        params = {}
        for k, v in signature.parameters.items():
            if isinstance(v.default, Depends):
                cache_key = str(v.default.dependency.__name__)
                cache = self.ctx.get()
                hit = cache.get(cache_key, None)
                if hit:
                    instance = hit
                else:
                    instance = v.default.dependency()
                    cache[cache_key] = instance
                    self.ctx.set(cache)
                params[k] = instance
        return method(**params)

遺留問題 (已經解決)

有些DataLoader的實現可能需要一個外部的查詢條件, 比如查詢使用者的absense資訊的時候,除了user_key 之外,還需要額外提供其他全域性filter 比如sprint_id)。 這種全域性變數從load引數走會顯得非常囉嗦。

這種時候就依然需要藉助contextvars 在外部設定變數。 以一段專案程式碼為例:

async def get_team_users_load(team_id: int, sprint_id: Optional[int], session: AsyncSession):
    ctx.team_id_context.set(team_id)      # set global filter
    ctx.sprint_id_context.set(sprint_id)  # set global filter
    res = await session.execute(select(User)
                                .join(UserTeam, UserTeam.user_id == User.id)
                                .filter(UserTeam.team_id == team_id))
    db_users = res.scalars()
    users = [schema.UserLoadUser(id=u.id, employee_id=u.employee_id, name=u.name) 
                for u in db_users]
    results = await Resolver().resolve(users)  # resolve
    return results
class AbsenseLoader(DataLoader):
    async def batch_load_fn(self, user_keys):
        async with async_session() as session, session.begin():
            sprint_id = ctx.sprint_id_context.get()  # read global filter
            sprint_stmt = Sprint.status == SprintStatusEnum.ongoing if not sprint_id else Sprint.id == sprint_id
            res = await session.execute(select(SprintAbsence)
                                        .join(Sprint, Sprint.id == SprintAbsence.sprint_id)
                                        .join(User, User.id == SprintAbsence.user_id)
                                        .filter(sprint_stmt)
                                        .filter(SprintAbsence.user_id.in_(user_keys)))
            rows = res.scalars().all()
            dct = {}
            for row in rows:
                dct[row.user_id] = row.hours
            return [dct.get(k, 0) for k in user_keys]

期望的設定方式為:

loader_filters = {
    AbsenseLoader: {'sprint_id': 10}, 
    OtherLoader: {field: 'value_x'}
}
results = await Resolver(loader_filters=loader_filters).resolve(users)

如果需要filter但是卻沒有設定, 該情況下要拋異常

以上就是pydantic-resolve巢狀資料結構生成LoaderDepend管理contextvars的詳細內容,更多關於LoaderDepend管理contextvars的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com