Python任务调度模型教程_生产者消费者模式

9次阅读

生产者消费者模式是解耦任务生成与执行的并发模型,由生产者线程向线程安全队列put任务、消费者线程get并处理,配合task_done和join实现同步,适用于日志处理等高吞吐场景。

Python任务调度模型教程_生产者消费者模式

什么是生产者消费者模式

生产者消费者模式是一种经典的并发编程模型,用于解耦任务的生成与执行。在python任务调度场景中,它表现为:一个或多个线程(或进程)负责生成任务(生产者),把任务放入共享队列;另一个或多个工作线程(消费者)持续从队列中取出任务并执行。这种结构天然适合异步、批量、高吞吐的任务调度系统,比如日志处理、定时爬虫、消息推送等。

核心组件:队列 + 线程/进程协作

Python标准库中的 queue.Queue 是实现该模式的关键——它线程安全,内置阻塞与超时机制,无需手动加锁。搭配 threadingmultiprocessing 模块即可快速搭建调度骨架。

  • 使用 queue.Queue(maxsize=0) 创建无限长队列;设为正整数可控制缓冲区大小,避免内存溢出
  • 生产者调用 put(item) 入队,可选 block=True, timeout=2 防止无限等待
  • 消费者调用 get() 出队,配合 task_done()join() 实现任务完成同步
  • 推荐用守护线程(daemon=True)运行消费者,主程序退出时自动终止

一个轻量可用的调度示例

下面是一个基于多线程的最小可行调度器,模拟“每秒生成3个任务,由2个工人并发处理”:

import queue import threading import time import random 

共享任务队列

task_queue = queue.Queue()

立即学习Python免费学习笔记(深入)”;

def producer(): for i in range(10): task = f"task-{i}" print(f"[生产] {task}") task_queue.put(task) time.sleep(1) # 模拟不均匀生产节奏

def worker(worker_id): while True: try: task = task_queue.get(timeout=3) # 3秒无任务则退出 print(f"[工人{worker_id}] 正在处理 {task}") time.sleep(random.uniform(0.5, 2)) # 模拟耗时操作 task_queue.task_done() except queue.Empty: break

启动1个生产者、2个消费者

threading.Thread(target=producer).start() for i in range(2): threading.Thread(target=worker, args=(i+1,), daemon=True).start()

等待所有任务完成

task_queue.join() print("全部任务执行完毕")

进阶建议:应对真实生产环境

简单示例满足学习,但上线需考虑健壮性与可观测性:

  • 任务应封装为可序列化对象(如字典或dataclass),便于持久化或跨进程传递
  • 加入异常捕获与重试逻辑,避免单个失败任务阻塞整个队列
  • Logging 替代 print,记录任务ID、开始/结束时间、耗时、错误
  • 必要时替换为 redis Queue(如RQ)或 apache airflow,支持分布式、持久化、Web监控
  • 若任务IO密集(如http请求),优先用 threading;若CPU密集,考虑 multiprocessingconcurrent.futures.ProcessPoolExecutor

text=ZqhQzanResources