Python sqlalchemy 2.0 的异步支持迁移

5次阅读

async_sessionmaker 创建后必须 await 才能获取活跃会话,其返回的 asyncsession 所有操作(如 execute、commit)均需 await,否则协程不执行;select() 查询须用 await session.execute(),不可直接调用 scalars().all();异步引擎须用 create_async_engine 及对应异步驱动,连接池须用 nullpool;事务需显式管理,避免跨 await 边界混用 begin/commit/rollback。

Python sqlalchemy 2.0 的异步支持迁移

async_sessionmaker 创建后必须配 await,不能直接调用

SQLAlchemy 2.0 的异步支持不是“打开开关就自动变快”,而是把整个执行链路推到协程里。最常见错误是写完 async_sessionmaker 还像以前一样 session = async_session(),结果得到一个 AsyncSession 实例——但这个实例所有操作(addcommitexecute)都返回协程对象,不 await 就永远不真正执行。

  • 正确姿势:用 async with async_session() as session: 或显式 await session.commit()
  • 错误现象:RuntimeWarning: coroutine 'AsyncSession.commit' was never awaited,或查询返回空结果、事务不生效
  • 注意 async_sessionmaker 本身不创建会话,它生成的是可 await 的工厂;每次调用都需 await 才拿到活跃会话

select() 查询必须用 await session.execute(),不能用 session.scalars()

同步模式下常用 session.scalars(select(...)).all(),但异步模式下没有等价的阻塞式 scalars()。直接调用会报 AttributeError: 'AsyncScalarResult' Object has no attribute 'all'——因为返回的是惰性迭代器,底层仍是协程封装

  • 必须写成:result = await session.execute(select(User)),再用 result.scalars().all()(注意 all() 是同步方法,但前面的 execute 必须 await)
  • 如果只取单条:(await session.execute(select(User).limit(1))).scalar_one_or_none()
  • 别漏掉 awaitsession.execute() 返回 AsyncResult,不是结果本身;没 await 就只是个未调度的协程对象

engine.connect() 在 async 模式下要换 asyncio.create_engine()

旧代码里用 create_engine("sqlite:///db.db") 配合 sessionmaker,迁移到异步时不能复用。普通 engine 不支持 async,强行传给 async_sessionmaker 会触发 AsyncEngine expected 类型错误。

  • 必须改用:create_async_engine("sqlite+aiosqlite:///db.db", echo=True)(SQLite 需 aiosqlite)、postgresql+asyncpg://... 等带 async driver 的 URL
  • driver 兼容性很关键:mysqlaiomysqlasyncmy,PostgreSQL 强烈推荐 asyncpgpsycopg 3.1+ 才支持 async)
  • 连接池参数也不同:AsyncEnginepoolclass 必须是 NullPool(默认),不能用 QueuePool —— 后者是线程安全设计,和 asyncio Event loop 冲突

嵌套 await 和事务边界容易出错

异步事务不像同步那样靠上下文管理器“自动兜底”。比如在函数 A 中 await session.commit(),然后调用函数 B 做另一组操作,B 里再 await session.rollback(),可能因 session 已关闭或状态混乱导致 InvalidRequestError: this Session's transaction is closed

立即学习Python免费学习笔记(深入)”;

  • 事务边界必须显式对齐:要么整个业务逻辑包在一个 async with session.begin(): 里,要么确保每个 commit/rollback 都对应一次独立的 begin()
  • 避免跨 await 边界传递 session 并混用 begin() 和手动 commit();更安全的做法是让每个协程函数自己管理自己的事务块
  • 特别注意 ORM 关系加载:selectinloadjoinedload 在 async 下行为一致,但 lazy='select' 触发的隐式查询仍需 await,容易漏掉

异步迁移最难的不是语法替换,而是把“线性执行预期”扭转为“协程调度意识”——session 不是资源句柄,是协程上下文;execute 不是动作,是调度请求;commit 不是落盘,是等待事件循环轮到它执行。漏掉一个 await,整条链就卡住不动,还很难 debug。

text=ZqhQzanResources