Python 事务性出队的数据库 + MQ 两阶段提交

2次阅读

mysqlselectfor update需用主键或唯一索引加锁,避免全表扫描;出队推荐read committed隔离级别;mq发送失败须区分超时与明确拒绝,前者标记重试、后者才回滚;db更新必须在redis删除之前;禁用auto_commit=true以保障事务一致性。

Python 事务性出队的数据库 + MQ 两阶段提交

MySQL 里怎么让 SELECT ... FOR UPDATE 真正锁住要出队的记录

不加对的锁,事务性出队就只是“看着像原子”,实际会丢消息或重复消费。核心是:必须在事务内、用主键或唯一索引字段加锁,且不能走全表扫描。

  • SELECT id, payload FROM queue_table WHERE status = 'pending' ORDER BY created_at LIMIT 1 FOR UPDATE —— 错!status 没索引,MySQL 可能升级为表锁,阻塞严重甚至死锁
  • 正确做法:给 statuscreated_at 建联合索引,或直接按主键 id 查(如 WHERE id = ? AND status = 'pending'),确保走行级锁
  • 注意隔离级别:READ COMMITTED 下,FOR UPDATE 只锁命中的行;REPEATABLE READ(默认)下可能锁住间隙,防幻读但容易锁多——出队场景推荐前者

发 MQ 失败时,怎么回滚数据库而不留“僵尸任务”

单纯靠 try...except + rollback() 不够。MQ 发送失败分两类:网络超时(可能已发成功)、业务拒绝(明确失败)。前者必须幂等,后者才能安全回滚。

  • 发送前,在 DB 记录 mq_status = 'sending',再发 MQ;成功后更新为 'sent'
  • 如果 MQ 客户端抛 KafkaTimeoutErrorConnectionRefusedError,不要直接 rollback,而是标记为 'retry_pending',由后台任务重试(最多 3 次)
  • 只有收到明确失败响应(如 rabbitmqChannelClosedByBroker 且 reply_code=312)才执行 UPDATE queue_table SET status = 'pending' WHERE id = ? 回滚

为什么不能用 auto_commit=True 配合手动 commit() 来搞两阶段

python DB API 规范里,auto_commit=True 会让每个 SQL 自动提交,后续调用 commit() 实际无效——你根本没法把“锁行”和“更新状态”包进同一个事务。

  • 检查连接是否真在事务中:conn.autocommit 必须为 False,且显式调用 conn.begin() 或用 with conn.cursor() as cur: 上下文(取决于驱动)
  • psycopg2 示例:conn.autocommit = False 后,cur.execute("UPDATE queue_table SET status='processing' WHERE id=%s", (row_id,)) 才受后续 conn.commit() 控制
  • mysql-connector-python 默认 autocommit=False,但 pymysql 默认 True——不查文档直接写,大概率掉坑里

Redis + MySQL 混合队列时,DELUPDATE 的顺序为什么不能反

如果先删 Redis key 再更新 MySQL 状态,中间进程崩溃,就会导致消息丢失(Redis 没了,DB 还卡在 pending);反过来,先更新 DB 再删 Redis,崩了还能靠定时任务捞。

立即学习Python免费学习笔记(深入)”;

  • 标准顺序:DB 更新 status → 成功后 redis_client.delete('queue:task:' + str(id)) → 最后发 MQ
  • Redis 删 key 失败不能 rollback DB:因为 Redis 是缓存层,不是权威源。应记日志并告警,由补偿任务清理残留 key
  • 别用 SETNX分布式锁来协调 DB 和 Redis——它解决不了“DB 提交成功但 Redis 不可用”的问题,反而增加单点依赖

真正难的不是代码写几行,是得想清楚哪一步失败会导致状态不一致,以及那个不一致状态能不能被检测和修复。比如 MQ 发出去了但 DB rollback 了,这时候光靠 DB 字段根本看不出消息已发——得靠消息体里的 trace_id 去对账系统查。

text=ZqhQzanResources