根本原因是消费者未正确确认消息,导致中间件重复投递:rabbitmq需显式basic_ack(),kafka需正确commit()且处理异常,redis无ack机制需自行实现去重与持久化。

为什么 consuming 多次但只收到一条消息?
根本原因通常是消费者没确认(ack),或确认方式不对,导致消息被 RabbitMQ / Kafka 等中间件反复投递。python 客户端默认行为差异大:比如 pika 默认是 auto-ack 模式,而 kafka-python 默认是手动 commit,且 commit 时机不等于消费完成。
- 检查连接初始化时是否显式设置了
auto_ack=False(pika)或enable_auto_commit=False(kafka-python) - 确认业务逻辑执行完后,是否调用了
channel.basic_ack()或consumer.commit();漏掉这步,下一次重连就会重发 - 注意
commit()在 Kafka 中可能抛出KafkaError,未捕获会导致后续 offset 不更新,形成“假重复”
message.ack() 调用后还重复?
不是 ack 本身失效,而是 ack 的 scope 错了:比如在多线程/协程里,用错 channel 实例,或在异常分支里跳过了 ack 调用。更隐蔽的是,某些 SDK(如 aiokafka)的 ack() 是异步的,没 await 就返回,实际没发出去。
- 确保
ack()和消费逻辑在同一个 channel / consumer 实例上下文中执行 - 在
try/except块中把ack()放在finally里,避免异常绕过 - 用
aiokafka时,必须await msg.ack(),写成msg.ack()不报错但无效 - 测试时可临时关闭 auto-commit 并打印
msg.offset和consumer.committed(topic_partition)对比,看是否滞后
本地调试时消息越积越多?
本质是消费者崩溃或退出太快,没来得及 commit offset,重启后从中断点前开始重读——看起来像重复,其实是“补偿性重放”。尤其常见于脚本式消费(非守护进程)、或使用 docker run --rm 启动的临时容器。
- 加日志:在处理前打
Logging.info(f"processing {msg.offset}"),处理后打logging.info(f"acked {msg.offset}"),观察 gap - 避免用
sys.exit()或未捕获的异常直接退出;改用信号监听(如signal.signal(signal.SIGTERM, ...))做 graceful shutdown - RabbitMQ 场景下,检查 queue 是否设了
durable=True且 consumer 声明时用了相同参数,否则重启后 queue 丢失,消息进 dead letter
用 redis.blpop() 模拟队列也重复?
Redis 本身无 ack 机制,blpop 是原子性弹出,一旦拿到就从 list 消失。所谓“重复”,基本是代码逻辑问题:比如异常后没把消息存回 list,或多个 worker 同时 blpop 同一个 key 却没做去重,又或者误把失败消息写进了另一个 list 导致二次消费。
立即学习“Python免费学习笔记(深入)”;
- 不要依赖
blpop做可靠队列;真要用,必须自己实现 pending list + 定时 retry + ack 标记(例如用setnx记录 msg_id) - 检查是否有多个
blpop连接指向同一 key,且没做 consumer ID 隔离 - 网络超时或连接中断时,
blpop抛ConnectionError,此时消息已丢失,需靠上游重发或本地持久化兜底
最常被忽略的点:不同中间件对“消费完成”的定义完全不同——RabbitMQ 看 ack,Kafka 看 commit,Redis 压根不管。拿一套逻辑套所有场景,必踩坑。