sqlalchemy 如何写 upsert 并在 PostgreSQL/MySQL 中兼容

7次阅读

postgresql用ON CONFLICT、mysql用ON DUPLICATE KEY UPDATE实现原子upsert;SQLAlchemy需按方言分别调用on_conflict_do_update()或on_duplicate_key_update(),不可混用;merge()非原子操作,高并发下有竞态风险。

sqlalchemy 如何写 upsert 并在 PostgreSQL/MySQL 中兼容

PostgreSQL 用 ON CONFLICT 实现 upsert

PostgreSQL 原生支持 INSERT ... ON CONFLICT,这是最直接、最高效的方式。SQLAlchemy 1.4+ 提供了 insert().on_conflict_do_update() 接口,但仅对 PostgreSQL 生效。

关键点:必须明确指定冲突目标(如主键或唯一索引列),否则会报 ON CONFLICT clause does not specify any conflict targets 错误。

  • index_elements 参数填主键或唯一约束字段名(如 ['id']['email']
  • set_ 里写更新逻辑,值可用 excluded.column_name 引用插入行的原始值
  • 若表无显式命名唯一约束,不能用 index_name,只能靠 index_elements
from sqlalchemy.dialects.postgresql import insert 

stmt = insert(User).values(id=1, name='Alice', email='a@example.com') stmt = stmt.on_conflict_do_update( indexelements=['id'], # 冲突检测列 set={'name': stmt.excluded.name, 'email': stmt.excluded.email} ) session.execute(stmt) session.commit()

MySQL 用 ON DUPLICATE KEY UPDATE 替代

MySQL 没有 ON CONFLICT,对应的是 INSERT ... ON DUPLICATE KEY UPDATE。SQLAlchemy 不提供跨方言的统一 upsert API,所以得手动适配。

常见错误:直接复用 PostgreSQL 的 on_conflict_do_update() 会导致 AttributeError(该方法在 MySQL dialect 中不存在)。

  • 检查 engine.dialect.name,区分 PostgreSQL / MySQL 分支处理
  • MySQL 下用 insert(...).on_duplicate_key_update()(需 SQLAlchemy 1.4+ + pymysql/mariadb driver)
  • on_duplicate_key_update 的参数是字典,键为列名,值可为字面量或 text() 表达式
from sqlalchemy.dialects.mysql import insert as mysql_insert 

if engine.dialect.name == 'mysql': stmt = mysql_insert(User).values(id=1, name='Alice', email='a@example.com') stmt = stmt.on_duplicate_key_update( name=stmt.inserted.name, email=stmt.inserted.email ) else:

PostgreSQL 分支(同上)

stmt = insert(User).values(...) stmt = stmt.on_conflict_do_update(...)

兼容写法:用原生 SQL + text() 绕过方言限制

如果项目必须用单一代码路径覆盖多数据库,且不想写 if 分支,可退回到原生 SQL 字符串 + text(),由开发者保证语法正确性。

风险在于失去 ORM 层的类型转换和参数绑定安全,容易引入 SQL 注入或类型错位(比如把字符串当 int 绑定)。

  • PostgreSQL:INSERT INTO user (id, name) VALUES (:id, :name) ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name
  • MySQL:INSERT INTO user (id, name) VALUES (:id, :name) ON DUPLICATE KEY UPDATE name = VALUES(name)
  • 所有参数必须用 :param 占位符,再传入 session.execute(text(...), {'id': 1, 'name': 'Alice'})

注意:MySQL 的 VALUES(column) 是特殊函数,不是字面值;PostgreSQL 的 EXCLUDED 是关键字,大小写敏感。

为什么不能依赖 merge()

session.merge() 看似能“自动 upsert”,但它不是原子操作:先 select 判断存在性,再决定 INSERTUPDATE。在高并发下可能产生竞态(两个事务同时查不到记录,然后都插入,触发唯一约束错误)。

  • merge() 还会触发完整对象加载和属性比对,性能差于原生命令
  • 它不规避数据库层的约束冲突,仍可能抛出 IntegrityError
  • MySQL 和 PostgreSQL 对 merge() 的行为一致,但底层仍是两步,无法替代真正的 upsert

真正需要 upsert 的场景(如日志去重、计数器更新、缓存同步),必须用数据库原生的原子插入/更新语义,而不是 ORM 的模拟逻辑。

text=ZqhQzanResources