您现在的位置是:网站首页> 编程资料编程资料

python如何使用contextvars模块源码分析_python_

2023-05-26 352人已围观

简介 python如何使用contextvars模块源码分析_python_

前记

在Python3.7后官方库出现了contextvars模块, 它的主要功能就是可以为多线程以及asyncio生态添加上下文功能,即使程序在多个协程并发运行的情况下,也能调用到程序的上下文变量, 从而使我们的逻辑解耦.

上下文,可以理解为我们说话的语境, 在聊天的过程中, 有些话脱离了特定的语境,他的意思就变了,程序的运行也是如此.在线程中也是有他的上下文,只不过称为堆栈,如在python中就是保存在thread.local变量中,而协程也有他自己的上下文,但是没有暴露出来,不过有了contextvars模块后我们可以通过contextvars模块去保存与读取.

使用contextvars的好处不仅可以防止’一个变量传遍天’的事情发生外,还能很好的结合TypeHint,可以让自己的代码可以被mypy以及IDE检查,让自己的代码更加适应工程化.
不过用了contextvars后会多了一些隐性的调用, 需要解决好这些隐性的成本.

更新说明

  • 切换web框架sanicstarlette
  • 增加一个自己编写且可用于starlette,fastapi的context说明
  • 更新fast_tools.context的最新示例以及简单的修改行文。

1.有无上下文传变量的区别

如果有用过Flask框架, 就知道了Flask拥有自己的上下文功能, 而contextvars跟它很像, 而且还增加了对asyncio的上下文提供支持。
Flask的上下文是基于threading.local实现的, threading.local的隔离效果很好,但是他是只针对线程的,只隔离线程之间的数据状态, 而werkzeug为了支持在gevent中运行,自己实现了一个Local变量, 常用的Flask上下文变量request的例子如下:

from flask import Flask, request app = Flask(__name__) @app.route('/') def root(): so1n_name = request.get('so1n_name') return f'Name is {so1n_name}'

拓展阅读:关于Flask 上下文详细介绍

与之相比的是Python的另一个经典Web框架Djano, 它没有上下文的支持, 所以只能显示的传request对象, 例子如下:

from django.http import HttpResponse def root(request): so1n_name = request.get('so1n_name') return HttpResponse(f'Name is {so1n_name}')

通过上面两者的对比可以发现, 在Django中,我们需要显示的传一个叫request的变量,而Flask则是import一个叫request的全局变量,并在视图中直接使用,达到解耦的目的.

可能会有人说, 也就是传个变量的区别,为了省传这个变量,而花许多功夫去维护一个上下文变量,有点不值得,那可以看看下面的例子,如果层次多就会出现’一个参数传一天’的情况(不过分层做的好或者需求不坑爹一般不会出现像下面的情况,一个好的程序员能做好代码的分层, 但可能也有出现一堆烂需求的时候)

# 伪代码,举个例子一个request传了3个函数 from django.http import HttpResponse def is_allow(request, uid): if request.ip == '127.0.0.1' and check_permissions(uid): return True else: return False def check_permissions(request, uid): pass def root(request): user_id = request.GET.get('uid') if is_allow(request, id): return HttpResponse('ok') else return HttpResponse('error')

此外, 除了防止一个参数传一天这个问题外, 通过上下文, 可以进行一些解耦, 比如有一个最经典的技术业务需求就是在日志打印request_id, 从而方便链路排查, 这时候如果有上下文模块, 就可以把读写request_id给解耦出来, 比如下面这个基于Flask框架读写request_id的例子:

import logging from typing import Any from flask import g # type: ignore from flask.logging import default_handler # 这是一个Python logging.Filter的对象, 日志在生成之前会经过Filter步骤, 这时候我们可以为他绑定request_id变量 class RequestIDLogFilter(logging.Filter): """ Log filter to inject the current request id of the request under `log_record.request_id` """ def filter(self, record: Any) -> Any: record.request_id = g.request_id or None return record # 配置日志的format格式, 这里多配了一个request_id变量 format_string: str = ( "[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d:%(funcName)s:%(request_id)s]" " %(message)s" ) # 为flask的默认logger设置format和增加一个logging.Filter对象 default_handler.setFormatter(logging.Formatter(format_string)) default_handler.addFilter(RequestIDLogFilter()) # 该方法用于设置request_id def set_request_id() -> None: g.request_id = request.headers.get("X-Request-Id", str(uuid4())) # 初始化FLask对象, 并设置before_request app: Flask = Flask("demo") app.before_request(set_request_id)

2.如何使用contextvars模块

这里举了一个例子, 但这个例子也有别的解决方案. 只不过通过这个例子顺便说如何使用contextvar模块

首先看看未使用contextvars时,asyncio的web框架是如何传变量的,根据starlette的文档,在未使用contextvars时,传递Redis客户端实例的办法是通过request.stat这个变量保存Redis客户端的实例,改写代码如下:

# demo/web_tools.py # 通过中间件把变量给存进去 class RequestContextMiddleware(BaseHTTPMiddleware): async def dispatch( self, request: Request, call_next: RequestResponseEndpoint ) -> Response: request.stat.redis = REDIS_POOL response = await call_next(request) return response # demo/server.py # 调用变量 @APP.route('/') async def homepage(request): # 伪代码,这里是执行redis命令 await request.stat.redis.execute() return JSONResponse({'hello': 'world'})

代码非常简便, 也可以正常的运行, 但你下次在重构时, 比如简单的把redis这个变量名改为new_redis, 那IDE不会识别出来, 需要一个一个改。 同时, 在写代码的时候, IDE永远不知道这个方法调用到的变量的类型是什么, IDE也无法智能的帮你检查(如输入request.stat.redis.时,IDE不会出现execute,或者出错时,IDE并不会提示). 这非常不利于项目的工程化, 而通过contextvarsTypeHints, 恰好能解决这个问题.

说了那么多, 下面以一个Redis client为例子,展示如何在asyncio生态中使用contextvars, 并引入TypeHints(详细解释见代码).

# demo/context.py # 该文件存放contextvars相关 import contextvars if TYPE_CHECKING: from demo.redis_dal import RDS # 这里是一个redis的封装实例 # 初始化一个redis相关的全局context redis_pool_context = contextvars.ContextVar('redis_pool') # 通过函数调用可以获取到当前协程运行时的context上下文 def get_redis() -> 'RDS': return redis_pool_context.get() # demo/web_tool.py # 该文件存放starlette相关模块 from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.middleware.base import RequestResponseEndpoint from starlette.responses import Response from demo.redis_dal import RDS # 初始化一个redis客户端变量,当前为空 REDIS_POOL = None # type: Optional[RDS] class RequestContextMiddleware(BaseHTTPMiddleware): async def dispatch( self, request: Request, call_next: RequestResponseEndpoint ) -> Response: # 通过中间件,在进入路由之前,把redis客户端放入当前协程的上下文之中 token = redis_pool_context.set(REDIS_POOL) try: response = await call_next(request) return response finally: # 调用完成,回收当前请求设置的redis客户端的上下文 redis_pool_context.reset(token) async def startup_event() -> None: global REDIS_POOL REDIS_POOL = RDS() # 初始化客户端,里面通过asyncio.ensure_future逻辑延后连接 async def shutdown_event() -> None: if REDIS_POOL: await REDIS_POOL.close() # 关闭redis客户端 # demo/server.py # 该文件存放starlette main逻辑 from starlette.applications import Starlette from starlette.responses import JSONResponse from demo.web_tool import RequestContextMiddleware from demo.context import get_redis APP = Starlette() APP.add_middleware(RequestContextMiddleware) @APP.route('/') async def homepage(request): # 伪代码,这里是执行redis命令 # 只要验证 id(get_redis())等于demo.web_tool里REDID_POOL的id一致,那证明contextvars可以为asyncio维护一套上下文状态 await get_redis().execute() return JSONResponse({'hello': 'world'})

3.如何优雅的使用contextvars

从上面的示例代码来看, 使用contextvarTypeHint确实能让让IDE可以识别到这个变量是什么了, 但增加的代码太多了,更恐怖的是, 每多一个变量,就需要自己去写一个context,一个变量的初始化,一个变量的get函数,同时在引用时使用函数会比较别扭.

自己在使用了contextvars一段时间后,觉得这样太麻烦了,每次都要做一堆重复的操作,且平时使用最多的就是把一个实例或者提炼出Headers的参数放入contextvars中,所以写了一个封装fast_tools.context(同时兼容fastapistarlette), 它能屏蔽所有与contextvars的相关逻辑,其中由ContextModel负责contextvars的set和get操作,ContextMiddleware管理contextvars的周期,HeaderHeader负责托管Headers相关的参数, 调用者只需要在ContextModel中写入自己需要的变量,引用时调用ContextModel的属性即可.

以下是调用者的代码示例, 这里的实例化变量由一个http client代替, 且都会每次请求分配一个客户端实例, 但在实际使用中并不会为每一个请求都分配一个客户端实例, 很影响性能:

import asyncio import uuid from contextvars import Context, copy_context from functools import partial from typing import Optional, Set import httpx from fastapi import FastAPI, Request, Response from fast_tools.context import ContextBaseModel, ContextMiddleware, HeaderHelper app: FastAPI = FastAPI() check_set: Set[int] = set() class ContextModel(ContextBaseModel): """ 通过该实例可以屏蔽大部分与contextvars相关的操作,如果要添加一个变量,则在该实例添加一个属性即可. 属性必须要使用Type Hints的写法,不然不会识别(强制使用Type Hints) """ # 用于把自己的实例(如上文所说的redis客户端)存放于contextvars中 http_client: httpx.AsyncClient # HeaderHepler用于把header的变量存放于contextvars中 request_id: str = HeaderHelper.i("X-Request-Id", default_func=lambda request: str(uuid.uuid4())) ip: str = HeaderHelper.i("X-Real-IP", default_func=lambda request: request.client.host) user_agent: str = HeaderHelper.i("User-Agent") async def before_request(self, request: Request) -> None: # 请求之前的钩子, 通过该钩子可以设置自己的变量 self.http_client = httpx.AsyncClient() check_set.add(id(self.http_client)) async def before_reset_context(self, request: Request, response: Optional[Response]) -> None: # 准备退出中间件的钩子, 这步奏后会清掉上下文 await self.http_client.aclose() context_model: ContextModel = ContextModel() app.add_middleware(ContextMiddleware, context_model=context_model) async def test_ensure_future() -> None: assert id(context_model.http_client) in check_set def test_run_in_executor() -> None: assert id(conte
                
                

-六神源码网