Python 并发代码的可测试性设计

20次阅读

asyncio单元测试常卡住或超时,根本原因是测试与被测协程共用事件循环且存在未关闭的全局loop、阻塞调用或未await的后台任务;需用@pytest.mark.asyncio、禁用真实I/O、检查遗漏await。

Python 并发代码的可测试性设计

为什么 asyncio 单元测试常卡住或超时

根本原因是测试运行在同一个事件循环中,而被测协程可能依赖全局状态(如未显式关闭的 asyncio.get_Event_loop())、阻塞调用(如 time.sleep())或未 await 的后台任务。测试框架(如 pytest)默认不自动管理异步上下文,直接调用 await my_coro() 在非 async 测试函数里会报 SyntaxError 或静默失败。

实操建议:

立即学习Python免费学习笔记(深入)”;

  • 始终用 @pytest.mark.asyncio(配合 pytest-asyncio 插件)或手动创建/关闭事件循环,避免复用测试间残留的 loop
  • 禁用真实 I/O:用 unittest.mock.patch 替换 asyncio.sleepaiohttp.Clientsession 等,返回预设的 AsyncMock 或已 resolve 的 Future
  • 检查是否遗漏 await:比如误写 task = asyncio.create_task(...) 但没等它完成,导致资源泄漏和测试挂起

如何让 async def 函数支持同步调用与测试

编码 await 会让函数无法在同步上下文中被简单验证逻辑(比如只测输入输出,不关心并发)。强行用 asyncio.run() 包裹又引入额外调度开销,且在已有事件循环中会报 RuntimeError: asyncio.run() cannot be called from a running event loop

实操建议:

立即学习Python免费学习笔记(深入)”;

  • 把核心逻辑拆成纯函数(def),异步外壳只负责调度和 I/O:例如 async def fetch_user(user_id): return await _fetch_user_impl(http_client, user_id),测试时直接调 _fetch_user_impl
  • 对必须保留 async 外壳的场景,加参数控制执行模式:async def process_data(data, run_sync=False): ...,内部用 if run_sync: return _do_work(data)
  • 避免在函数内调用 asyncio.get_running_loop() —— 它在无 loop 环境下直接抛 RuntimeError;改用 asyncio.get_event_loop() 并捕获异常做 fallback

Threading.Threadmultiprocessing.Process 的测试隔离难点

线程/进程启动后脱离测试主线程控制,assert 失败不会中断测试进程,日志可能丢失,且共享状态(如全局变量、文件句柄)导致测试间污染。更麻烦的是,coverage.py 默认不收集子进程代码覆盖率。

实操建议:

立即学习Python免费学习笔记(深入)”;

  • queue.Queuemultiprocessing.Queue 收集子线程/进程的错误信息,主测试线程主动 get(timeout=...) 检查结果,避免无限等待
  • 禁止在并发单元中直接操作模块级全局状态;改用传参方式注入配置或状态对象(如 cache: dict
  • 运行覆盖率时加参数:coverage run --parallel-mode -m pytest,再用 coverage combine && coverage report 合并所有进程数据
  • threading.Timerthreading.Event 类型的等待逻辑,统一替换为可 mock 的接口(如 wait_fn: Callable[[], None]),测试时注入 Lambda: None 快速通过

Mock 异步依赖时最易忽略的三个细节

很多人用 AsyncMock 却仍遇到 TypeError: Object AsyncMock can't be used in 'await' expression,或 mock 返回值类型不对导致下游解析失败。

实操建议:

立即学习Python免费学习笔记(深入)”;

  • AsyncMock 默认返回另一个 AsyncMock,不是可 await 的协程;要让它返回值,必须设 return_valueside_effect,例如:mock_fetch.return_value = {"id": 1},而非 mock_fetch.return_value = AsyncMock(return_value={"id": 1})
  • mock 对象的返回类型需匹配真实调用签名:如果原函数返回 list[dict],mock 就不能只返回 dict,否则业务代码里的 for item in result: 会报 TypeError: 'dict' object is not iterable
  • async for 场景(如异步生成器),mock 必须是真正的异步迭代器:用 AsyncMock().__aiter__.return_value = [1, 2, 3].__iter__() 不行;正确做法是定义一个带 __aiter____anext__ 的类,或用 AsyncMockside_effectStopAsyncIteration

并发代码的可测试性不在于“能不能测”,而在于“测的时候有没有能力切断所有外部依赖路径”。任何隐式调度、全局状态、未声明的 I/O 都是测试稳定性的裂缝。

text=ZqhQzanResources