首頁 > 軟體

python如何使用contextvars模組原始碼分析

2022-06-27 18:01:49

前記

在Python3.7後官方庫出現了contextvars模組, 它的主要功能就是可以為多執行緒以及asyncio生態新增上下文功能,即使程式在多個協程並行執行的情況下,也能呼叫到程式的上下文變數, 從而使我們的邏輯解耦.

上下文,可以理解為我們說話的語境, 在聊天的過程中, 有些話脫離了特定的語境,他的意思就變了,程式的執行也是如此.線上程中也是有他的上下文,只不過稱為堆疊,如在python中就是儲存在thread.local變數中,而協程也有他自己的上下文,但是沒有暴露出來,不過有了contextvars模組後我們可以通過contextvars模組去儲存與讀取.

使用contextvars的好處不僅可以防止’一個變數傳遍天’的事情發生外,還能很好的結合TypeHint,可以讓自己的程式碼可以被mypy以及IDE檢查,讓自己的程式碼更加適應工程化.
不過用了contextvars後會多了一些隱性的呼叫, 需要解決好這些隱性的成本.

更新說明

  • 切換web框架sanicstarlette
  • 增加一個自己編寫且可用於starlette,fastapi的context說明
  • 更新fast_tools.context的最新範例以及簡單的修改行文。

1.有無上下文傳變數的區別

如果有用過Flask框架, 就知道了Flask擁有自己的上下文功能, 而contextvars跟它很像, 而且還增加了對asyncio的上下文提供支援。
Flask的上下文是基於threading.local實現的, threading.local的隔離效果很好,但是他是隻針對執行緒的,只隔離執行緒之間的資料狀態, 而werkzeug為了支援在gevent中執行,自己實現了一個Local變數, 常用的Flask上下文變數request的例子如下:

from flask import Flask, request
app = Flask(__name__)
@app.route('/')
def root():
    so1n_name = request.get('so1n_name')
    return f'Name is {so1n_name}'

拓展閱讀:關於Flask 上下文詳細介紹

與之相比的是Python的另一個經典Web框架Djano, 它沒有上下文的支援, 所以只能顯示的傳request物件, 例子如下:

from django.http import HttpResponse
def root(request):
    so1n_name = request.get('so1n_name')
    return HttpResponse(f'Name is {so1n_name}')

通過上面兩者的對比可以發現, 在Django中,我們需要顯示的傳一個叫request的變數,而Flask則是import一個叫request的全域性變數,並在檢視中直接使用,達到解耦的目的.

可能會有人說, 也就是傳個變數的區別,為了省傳這個變數,而花許多功夫去維護一個上下文變數,有點不值得,那可以看看下面的例子,如果層次多就會出現’一個引數傳一天’的情況(不過分層做的好或者需求不坑爹一般不會出現像下面的情況,一個好的程式設計師能做好程式碼的分層, 但可能也有出現一堆爛需求的時候)

# 虛擬碼,舉個例子一個request傳了3個函數
from django.http import HttpResponse
def is_allow(request, uid):
    if request.ip == '127.0.0.1' and check_permissions(uid):
        return True
    else:
        return False
def check_permissions(request, uid):
    pass

def root(request):
    user_id = request.GET.get('uid')
    if is_allow(request, id):
    	return HttpResponse('ok')
    else
        return HttpResponse('error')

此外, 除了防止一個引數傳一天這個問題外, 通過上下文, 可以進行一些解耦, 比如有一個最經典的技術業務需求就是在紀錄檔列印request_id, 從而方便鏈路排查, 這時候如果有上下文模組, 就可以把讀寫request_id給解耦出來, 比如下面這個基於Flask框架讀寫request_id的例子:

import logging
from typing import Any
from flask import g  # type: ignore
from flask.logging import default_handler
# 這是一個Python logging.Filter的物件, 紀錄檔在生成之前會經過Filter步驟, 這時候我們可以為他繫結request_id變數
class RequestIDLogFilter(logging.Filter):
    """
    Log filter to inject the current request id of the request under `log_record.request_id`
    """
    def filter(self, record: Any) -> Any:
        record.request_id = g.request_id or None
        return record
# 設定紀錄檔的format格式, 這裡多配了一個request_id變數
format_string: str = (
    "[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d:%(funcName)s:%(request_id)s]" " %(message)s"
)
# 為flask的預設logger設定format和增加一個logging.Filter物件
default_handler.setFormatter(logging.Formatter(format_string))
default_handler.addFilter(RequestIDLogFilter())
# 該方法用於設定request_id
def set_request_id() -> None:
    g.request_id = request.headers.get("X-Request-Id", str(uuid4()))
# 初始化FLask物件, 並設定before_request
app: Flask = Flask("demo")
app.before_request(set_request_id)

2.如何使用contextvars模組

這裡舉了一個例子, 但這個例子也有別的解決方案. 只不過通過這個例子順便說如何使用contextvar模組

首先看看未使用contextvars時,asyncio的web框架是如何傳變數的,根據starlette的檔案,在未使用contextvars時,傳遞Redis使用者端範例的辦法是通過request.stat這個變數儲存Redis使用者端的範例,改寫程式碼如下:

# demo/web_tools.py
# 通過中介軟體把變數給存進去
class RequestContextMiddleware(BaseHTTPMiddleware):
    async def dispatch(
            self, request: Request, call_next: RequestResponseEndpoint
    ) -> Response:
        request.stat.redis = REDIS_POOL
        response = await call_next(request)
        return response
# demo/server.py
# 呼叫變數
@APP.route('/')
async def homepage(request):
    # 虛擬碼,這裡是執行redis命令
    await request.stat.redis.execute()
    return JSONResponse({'hello': 'world'})

程式碼非常簡便, 也可以正常的執行, 但你下次在重構時, 比如簡單的把redis這個變數名改為new_redis, 那IDE不會識別出來, 需要一個一個改。 同時, 在寫程式碼的時候, IDE永遠不知道這個方法呼叫到的變數的型別是什麼, IDE也無法智慧的幫你檢查(如輸入request.stat.redis.時,IDE不會出現execute,或者出錯時,IDE並不會提示). 這非常不利於專案的工程化, 而通過contextvarsTypeHints, 恰好能解決這個問題.

說了那麼多, 下面以一個Redis client為例子,展示如何在asyncio生態中使用contextvars, 並引入TypeHints(詳細解釋見程式碼).

# demo/context.py
# 該檔案存放contextvars相關
import contextvars
if TYPE_CHECKING:
    from demo.redis_dal import RDS  # 這裡是一個redis的封裝範例
# 初始化一個redis相關的全域性context
redis_pool_context = contextvars.ContextVar('redis_pool')
# 通過函數呼叫可以獲取到當前協程執行時的context上下文
def get_redis() -> 'RDS':
    return redis_pool_context.get()
# demo/web_tool.py
# 該檔案存放starlette相關模組
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.middleware.base import RequestResponseEndpoint
from starlette.responses import Response
from demo.redis_dal import RDS
# 初始化一個redis使用者端變數,當前為空
REDIS_POOL = None  # type: Optional[RDS]
class RequestContextMiddleware(BaseHTTPMiddleware):
    async def dispatch(
            self, request: Request, call_next: RequestResponseEndpoint
    ) -> Response:
        # 通過中介軟體,在進入路由之前,把redis使用者端放入當前協程的上下文之中
        token = redis_pool_context.set(REDIS_POOL)
        try:
        	response = await call_next(request)
            return response
        finally:
        	# 呼叫完成,回收當前請求設定的redis使用者端的上下文
            redis_pool_context.reset(token)
async def startup_event() -> None:
    global REDIS_POOL
    REDIS_POOL = RDS() # 初始化使用者端,裡面通過asyncio.ensure_future邏輯延後連線
async def shutdown_event() -> None:
    if REDIS_POOL:
        await REDIS_POOL.close() # 關閉redis使用者端
# demo/server.py
# 該檔案存放starlette main邏輯
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from demo.web_tool import RequestContextMiddleware
from demo.context import get_redis
APP = Starlette()
APP.add_middleware(RequestContextMiddleware)
@APP.route('/')
async def homepage(request):
    # 虛擬碼,這裡是執行redis命令
    # 只要驗證 id(get_redis())等於demo.web_tool裡REDID_POOL的id一致,那證明contextvars可以為asyncio維護一套上下文狀態
    await get_redis().execute()
    return JSONResponse({'hello': 'world'})

3.如何優雅的使用contextvars

從上面的範例程式碼來看, 使用contextvarTypeHint確實能讓讓IDE可以識別到這個變數是什麼了, 但增加的程式碼太多了,更恐怖的是, 每多一個變數,就需要自己去寫一個context,一個變數的初始化,一個變數的get函數,同時在參照時使用函數會比較彆扭.

自己在使用了contextvars一段時間後,覺得這樣太麻煩了,每次都要做一堆重複的操作,且平時使用最多的就是把一個範例或者提煉出Headers的引數放入contextvars中,所以寫了一個封裝fast_tools.context(同時相容fastapistarlette), 它能遮蔽所有與contextvars的相關邏輯,其中由ContextModel負責contextvars的set和get操作,ContextMiddleware管理contextvars的週期,HeaderHeader負責託管Headers相關的引數, 呼叫者只需要在ContextModel中寫入自己需要的變數,參照時呼叫ContextModel的屬性即可.

以下是呼叫者的程式碼範例, 這裡的範例化變數由一個http client代替, 且都會每次請求分配一個使用者端範例, 但在實際使用中並不會為每一個請求都分配一個使用者端範例, 很影響效能:

import asyncio
import uuid
from contextvars import Context, copy_context
from functools import partial
from typing import Optional, Set
import httpx
from fastapi import FastAPI, Request, Response
from fast_tools.context import ContextBaseModel, ContextMiddleware, HeaderHelper
app: FastAPI = FastAPI()
check_set: Set[int] = set()
class ContextModel(ContextBaseModel):
    """
	通過該範例可以遮蔽大部分與contextvars相關的操作,如果要新增一個變數,則在該範例新增一個屬性即可.
	屬性必須要使用Type Hints的寫法,不然不會識別(強制使用Type Hints)
    """
    # 用於把自己的範例(如上文所說的redis使用者端)存放於contextvars中
    http_client: httpx.AsyncClient
    # HeaderHepler用於把header的變數存放於contextvars中
    request_id: str = HeaderHelper.i("X-Request-Id", default_func=lambda request: str(uuid.uuid4()))
    ip: str = HeaderHelper.i("X-Real-IP", default_func=lambda request: request.client.host)
    user_agent: str = HeaderHelper.i("User-Agent")

    async def before_request(self, request: Request) -> None:
        # 請求之前的勾點, 通過該勾點可以設定自己的變數
        self.http_client = httpx.AsyncClient()
        check_set.add(id(self.http_client))

    async def before_reset_context(self, request: Request, response: Optional[Response]) -> None:
        # 準備退出中介軟體的勾點, 這步奏後會清掉上下文
        await self.http_client.aclose()
context_model: ContextModel = ContextModel()
app.add_middleware(ContextMiddleware, context_model=context_model)
async def test_ensure_future() -> None:
    assert id(context_model.http_client) in check_set
def test_run_in_executor() -> None:
    assert id(context_model.http_client) in check_set
def test_call_soon() -> None:
    assert id(context_model.http_client) in check_set
@app.get("/")
async def root() -> dict:
    # 在使用asyncio.ensure_future開啟另外一個子協程跑任務時, 也可以複用上下文
    asyncio.ensure_future(test_ensure_future())
    loop: "asyncio.AbstractEventLoop" = asyncio.get_event_loop()
    # 使用call_soon也能複用上下文
    loop.call_soon(test_call_soon)
    # 使用run_in_executor也能複用上下文, 但必須使用上下文的run方法, copy_context表示複製當前的上下文
    ctx: Context = copy_context()
    await loop.run_in_executor(None, partial(ctx.run, test_run_in_executor))  # type: ignore
    return {
        "message": context_model.to_dict(is_safe_return=True),  # not return CustomQuery
        "client_id": id(context_model.http_client),
    }
if __name__ == "__main__":
    import uvicorn  # type: ignore
    uvicorn.run(app)

可以從例子中看到, 通過封裝的上下文呼叫會變得非常愉快, 只要通過一兩步方法就能設定好自己的上下文屬性, 同時不用考慮如何編寫上下文的生命週期. 另外也能通過這個例子看出, 在asyncio生態中, contextvars能運用到包括子協程, 多執行緒等所有的場景中.

4.contextvars的原理

在第一次使用時,我就很好奇contextvars是如何去維護程式的上下文的,好在contextvars的作者出了一個向下相容的contextvars庫,雖然他不支援asyncio,但我們還是可以通過程式碼瞭解到他的基本原理.

4.1 ContextMeta,ContextVarMeta和TokenMeta

程式碼倉中有ContextMeta,ContextVarMetaTokenMeta這幾個物件, 它們的功能都是防止使用者來繼承Context,ContextVarToken,原理都是通過元類來判斷類名是否是自己編寫類的名稱,如果不是則拋錯.

class ContextMeta(type(collections.abc.Mapping)):
    # contextvars.Context is not subclassable.
    def __new__(mcls, names, bases, dct):
        cls = super().__new__(mcls, names, bases, dct)
        if cls.__module__ != 'contextvars' or cls.__name__ != 'Context':
            raise TypeError("type 'Context' is not an acceptable base type")
        return cls

4.2 Token

上下文的本質是一個堆疊, 每次set一次物件就向堆疊增加一層資料, 每次reset就是pop掉最上層的資料, 而在Contextvars中, 通過Token物件來維護堆疊之間的互動.

class Token(metaclass=TokenMeta):
    MISSING = object()
    def __init__(self, context, var, old_value):
        # 分別存放上下文變數, 當前set的資料以及上次set的資料
        self._context = context
        self._var = var
        self._old_value = old_value
        self._used = False
    @property
    def var(self):
        return self._var
    @property
    def old_value(self):
        return self._old_value
    def __repr__(self):
        r = '<Token '
        if self._used:
            r += ' used'
        r += ' var={!r} at {:0x}>'.format(self._var, id(self))
        return r

可以看到Token的程式碼很少, 它只儲存當前的context變數, 本次呼叫set的資料和上一次被set的舊資料. 使用者只有在呼叫contextvar.context後才能得到Token, 返回的Token可以被使用者在呼叫context後, 通過呼叫context.reset(token)來清空儲存的上下文,方便本次context的變數能及時的被回收, 回到上上次的資料.

4.3 全域性唯一context

前面說過, Python中由threading.local()負責每個執行緒的context, 協程屬於執行緒的’子集’,所以contextvar直接基於threading.local()生成自己的全域性context. 從他的原始碼可以看到, _state就是threading.local()的參照, 並通過設定和讀取_statecontext屬性來寫入和讀取當前的上下文, copy_context呼叫也很簡單, 同樣也是呼叫到threading.local()API.

def copy_context():
    return _get_context().copy()
def _get_context():
    ctx = getattr(_state, 'context', None)
    if ctx is None:
        ctx = Context()
        _state.context = ctx
    return ctx
def _set_context(ctx):
    _state.context = ctx
_state = threading.local()

關於threading.local(),雖然不是本文重點,但由於contextvars是基於threading.local()進行封裝的,所以還是要明白threading.local()的原理,這裡並不直接通過原始碼分析, 而是做一個簡單的範例解釋.

在一個執行緒裡面使用執行緒的區域性變數會比直接使用全域性變數的效能好,因為區域性變數只有執行緒自己能看見,不會影響其他執行緒,而全域性變數的修改必須加鎖, 效能會變得很差, 比如下面全域性變數的例子:

pet_dict = {}
def get_pet(pet_name):
    return pet_dict[pet_name]
def set_pet(pet_name):
    return pet_dict[pet_name]

這份程式碼就是模仿一個簡單的全域性變數呼叫, 如果是多執行緒呼叫的話, 那就需要加鎖啦, 每次在讀寫之前都要等到持有鎖的執行緒放棄了鎖後再去競爭, 而且還可能汙染到了別的執行緒存放的資料.

而執行緒的區域性變數則是讓每個執行緒有一個自己的pet_dict, 假設每個執行緒呼叫get_pet,set_pet時,都會把自己的pid傳入進來, 那麼就可以避免多個執行緒去同時競爭資源, 同時也不會汙染到別的執行緒的資料, 那麼程式碼可以改為這樣子:

pet_dict = {}
def get_pet(pet_name, pid):
    return pet_dict[pid][pet_name]
def set_pet(pet_name, pid):
    return pet_dict[pid][pet_name]

不過這樣子使用起來非常方便, 同時範例例子沒有對異常檢查和初始化等處理, 如果值比較複雜, 我們還要維護異常狀況, 這樣太麻煩了.

這時候threading.local()就應運而生了,他負責幫我們處理這些維護的工作,我們只要對他進行一些呼叫即可,呼叫起來跟單執行緒呼叫一樣簡單方便, 應用threading.local()後的程式碼如下:

import threading
thread_local=threading.local()
def get_pet(pet_name):
    return thread_local[pet_name]
def set_pet(pet_name):
    return thread_local[pet_name]

可以看到程式碼就像呼叫全域性變數一樣, 但是又不會產生競爭狀態。

4.4contextvar自己封裝的Context

contextvars自己封裝的Context比較簡單, 這裡只展示他的兩個核心方法(其他的魔術方法就像dict的魔術方法一樣):

class Context(collections.abc.Mapping, metaclass=ContextMeta):
    def __init__(self):
        self._data = immutables.Map()
        self._prev_context = None
    def run(self, callable, *args, **kwargs):
        if self._prev_context is not None:
            raise RuntimeError(
                'cannot enter context: {} is already entered'.format(self))
        self._prev_context = _get_context()
        try:
            _set_context(self)
            return callable(*args, **kwargs)
        finally:
            _set_context(self._prev_context)
            self._prev_context = None
    def copy(self):
        new = Context()
        new._data = self._data
        return new

首先, 在__init__方法可以看到self._data,這裡使用到了一個叫immutables.Map()的不可變物件,並對immutables.Map()進行一些封裝,所以context可以看成一個不可變的dict。這樣可以防止呼叫copy方法後得到的上下文的變動會影響到了原本的上下文變數。

檢視immutables.Map()的範例程式碼可以看到,每次對原物件的修改時,原物件並不會發生改變,並會返回一個已經發生改變的新物件.

map2 = map.set('a', 10)
print(map, map2)
# will print:
#   <immutables.Map({'a': 1, 'b': 2})>
#   <immutables.Map({'a': 10, 'b': 2})>
map3 = map2.delete('b')
print(map, map2, map3)
# will print:
#   <immutables.Map({'a': 1, 'b': 2})>
#   <immutables.Map({'a': 10, 'b': 2})>
#   <immutables.Map({'a': 10})>

此外,context還有一個叫run的方法, 上面在執行loop.run_in_executor時就用過run方法, 目的就是可以產生一個新的上下文變數給另外一個執行緒使用, 同時這個新的上下文變數跟原來的上下文變數是一致的.
執行run的時候,可以看出會copy一個新的上下文來呼叫傳入的函數, 由於immutables.Map的存在, 函數中對上下文的修改並不會影響舊的上下文變數, 達到程序複製資料時的寫時複製的目的. 在run方法的最後, 函數執行完了會再次set舊的上下文, 從而完成一次上下文切換.

def run(self, callable, *args, **kwargs):
    # 已經存在舊的context,丟擲異常,防止多執行緒迴圈呼叫
    if self._prev_context is not None:
        raise RuntimeError(
            'cannot enter context: {} is already entered'.format(self))
    self._prev_context = _get_context()  # 儲存當前的context
    try:
        _set_context(self) # 設定新的context
        return callable(*args, **kwargs)  # 執行函數
    finally:
        _set_context(self._prev_context)  # 設定為舊的context
        self._prev_context = None

4.5 ContextVar

我們一般在使用contextvars模組時,經常使用的就是ContextVar這個類了,這個類很簡單,主要提供了set–設定值,get–獲取值,reset–重置值三個方法, 從Context類中寫入和獲取值, 而set和reset的就是通過上面的token類進行互動的.

set – 為當前上下文設定變數

def set(self, value):
    ctx = _get_context()  # 獲取當前上下文物件`Context`
    data = ctx._data
    try:
        old_value = data[self]  # 獲取Context舊物件
    except KeyError:
        old_value = Token.MISSING  # 獲取不到則填充一個object(全域性唯一)
    updated_data = data.set(self, value) # 設定新的值
    ctx._data = updated_data
    return Token(ctx, self, old_value) # 返回帶有舊值的token

get – 從當前上下文獲取變數

def get(self, default=_NO_DEFAULT):
    ctx = _get_context()  # 獲取當前上下文物件`Context`
    try:
        return ctx[self]  # 返回獲取的值
    except KeyError:
        pass
    if default is not _NO_DEFAULT:
        return default    # 返回撥用get時設定的值
    if self._default is not _NO_DEFAULT:
        return self._default  # 返回初始化context時設定的預設值
    raise LookupError  # 都沒有則會拋錯

reset – 清理本次用到的上下文資料

def reset(self, token):
       if token._used:
       	# 判斷token是否已經被使用
           raise RuntimeError("Token has already been used once")
       if token._var is not self:
       	# 判斷token是否是當前contextvar返回的
           raise ValueError(
               "Token was created by a different ContextVar")
       if token._context is not _get_context():
       	# 判斷token的上下文是否跟contextvar上下文一致
           raise ValueError(
               "Token was created in a different Context")
       ctx = token._context
       if token._old_value is Token.MISSING:
       	# 如果沒有舊值則刪除該值
           ctx._data = ctx._data.delete(token._var)
       else:
       	# 有舊值則當前contextvar變為舊值
           ctx._data = ctx._data.set(token._var, token._old_value)
       token._used = True  # 設定flag,標記token已經被使用了

則此,contextvar的原理了解完了,接下來再看看他是如何在asyncio執行的.

5.contextvars asyncio

由於向下相容的contextvars並不支援asyncio, 所以這裡通過aiotask-context的原始碼簡要的瞭解如何在asyncio中如何獲取和設定context。

5.1在asyncio中獲取context

相比起contextvars複雜的概念,在asyncio中,我們可以很簡單的獲取到當前協程的task, 然後通過task就可以很方便的獲取到task的context了,由於Pyhon3.7對asyncio的高階API 重新設計,所以可以看到需要對獲取當前task進行封裝

PY37 = sys.version_info >= (3, 7)
if PY37:
    def asyncio_current_task(loop=None):
        """Return the current task or None."""
        try:
            return asyncio.current_task(loop)
        except RuntimeError:
            # simulate old behaviour
            return None
else:
    asyncio_current_task = asyncio.Task.current_task

不同的版本有不同的獲取task方法, 之後我們就可以通過呼叫asyncio_current_task().context即可獲取到當前的上下文了…

5.2 對上下文的操作

同樣的,在得到上下文後, 我們這裡也需要set, get, reset的操作,不過十分簡單, 類似dict一樣的操作即可, 它沒有token的邏輯:

set

def set(key, value):
    """
    Sets the given value inside Task.context[key]. If the key does not exist it creates it.
    :param key: identifier for accessing the context dict.
    :param value: value to store inside context[key].
    :raises
    """
    current_task = asyncio_current_task()
    if not current_task:
        raise ValueError(NO_LOOP_EXCEPTION_MSG.format(key))

    current_task.context[key] = value

get

def get(key, default=None):
    """
    Retrieves the value stored in key from the Task.context dict. If key does not exist,
    or there is no event loop running, default will be returned
    :param key: identifier for accessing the context dict.
    :param default: None by default, returned in case key is not found.
    :return: Value stored inside the dict[key].
    """
    current_task = asyncio_current_task()
    if not current_task:
        raise ValueError(NO_LOOP_EXCEPTION_MSG.format(key))
    return current_task.context.get(key, default)

clear – 也就是contextvar.ContextVars中的reset

def clear():
    """
    Clear the Task.context.
    :raises ValueError: if no current task.
    """
    current_task = asyncio_current_task()
    if not current_task:
        raise ValueError("No event loop found")
    current_task.context.clear()

5.2 copying_task_factory和chainmap_task_factory

在Python的更高階版本中,已經支援設定context了,所以這兩個方法可以不再使用了.他們最後都用到了task_factory的方法.
task_factory簡單說就是建立一個新的task,再通過工廠方法合成context,最後把context設定到task

def task_factory(loop, coro, copy_context=False, context_factory=None):
    """
    By default returns a task factory that uses a simple dict as the task context,
    but allows context creation and inheritance to be customized via ``context_factory``.
    """
    # 生成context工廠函數
    context_factory = context_factory or partial(
        dict_context_factory, copy_context=copy_context)
    # 建立task, 跟asyncio.ensure_future一樣
    task = asyncio.tasks.Task(coro, loop=loop)
    if task._source_traceback:
        del [-1]

    # 獲取task的context
    try:
        context = asyncio_current_task(loop=loop).context
    except AttributeError:
        context = None
    # 從context工廠中處理context並賦值在task
    task.context = context_factory(context)
    return task

aiotask-context提供了兩個對context處理的函數dict_context_factorychainmap_context_factory.在aiotask-context中,context是一個dict物件,dict_context_factory可以選擇賦值或者設定新的context

def dict_context_factory(parent_context=None, copy_context=False):
    """A traditional ``dict`` context to keep things simple"""
    if parent_context is None:
        # initial context
        return {}
    else:
        # inherit context
        new_context = parent_context
        if copy_context:
            new_context = deepcopy(new_context)
        return new_context

chainmap_context_factorydict_context_factory的區別就是在合併context而不是直接繼承.同時借用ChainMap保證合併context後,還能同步context的改變

def chainmap_context_factory(parent_context=None):
    """
    A ``ChainMap`` context, to avoid copying any data
    and yet preserve strict one-way inheritance
    (just like with dict copying)
    """
    if parent_context is None:
        # initial context
        return ChainMap()
    else:
        # inherit context
        if not isinstance(parent_context, ChainMap):
            # if a dict context was previously used, then convert
            # (without modifying the original dict)
            parent_context = ChainMap(parent_context)
        return parent_context.new_child()

至此, asyncio中context的呼叫就簡單的分析完了, 如果想要深入的瞭解asyncio是怎麼傳上下文的, 可以檢視asyncio都原始碼.

6.總結

contextvars本身原理很簡單,但他可以讓我們呼叫起來更加方便便捷,減少我們的傳參次數,同時還可以結合TypeHint使專案更加工成化, 但是還是仁者見仁. 不過在使用時最好能加上一層封裝, 最好的實踐應該是一個協程共用同一個context而不是每個變數一個context.

到此這篇關於python如何使用contextvars模組原始碼分析的文章就介紹到這了,更多相關python contextvars 內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com