Python生产者消费者模型_并发设计模式

3次阅读

python生产者消费者模型用队列(如queue.queue、multiprocessing.queue)解耦任务生成与处理,因其内置锁机制避免竞态条件;线程用queue.queue配task_done()/join(),进程用multiprocessing.queue并注意close()/join_thread();协程版用asyncio.queue适配i/o密集型。

Python生产者消费者模型_并发设计模式

Python中的生产者消费者模型是解决并发任务调度的经典设计模式,核心在于解耦任务生成与处理逻辑,通过队列协调多线程或多进程协作。

为什么用队列而不是全局变量

直接共享变量容易引发竞态条件和数据不一致。queue.Queue(线程安全)或multiprocessing.Queue(进程间安全)内部已实现锁机制,put()和get()自动加锁,避免手动同步的复杂性。

  • 线程场景优先用queue.Queue,支持block、timeout和task_done()配合join()
  • 多进程场景必须用multiprocessing.Queue,不能跨进程共享普通列表或字典
  • 若需高吞吐,可考虑queue.SimpleQueue(无阻塞/超时,轻量但功能少)

标准线程版实现要点

关键不是写多少线程,而是控制好退出条件和资源清理。

  • 生产者完成生产后,向队列放入None或预定义哨兵值,消费者读到即退出
  • 调用queue.task_done()标记单个任务完成,主流程用queue.join()等待全部处理完毕
  • 避免让消费者死循环轮询——用queue.get(timeout=1)代替无限get(),防止卡死

进程版要注意的坑

multiprocessing.Queue不支持close()后再次使用,且子进程异常退出可能导致队列卡住。

立即学习Python免费学习笔记(深入)”;

  • 务必在每个子进程里显式调用queue.close()queue.join_thread()
  • 生产者进程应限制总任务数,防止队列爆内存(可用queue.qsize()做粗略监控,但注意该方法在unixwindows下行为不同)
  • 传递复杂对象时,确保可被pickle序列化;函数不能是Lambda或嵌套定义

进阶:协程版(asyncio + asyncio.Queue)

适合I/O密集型场景,比线程更轻量,但要求所有操作都是异步的。

  • await queue.put(item)await queue.get(),非阻塞但协程友好
  • 取消任务时,需主动cancel()对应task,并用try/except CancelledError清理
  • 不要混用threading和asyncio —— 协程中调用阻塞IO会拖垮整个事件循环
text=ZqhQzanResources