Python dask 的并行数据处理实践

2次阅读

dask.delayed 更适合数据流水线因其构建可调度的dag,支持中间复用、条件分支与失败重算;而concurrent.futures仅适用于独立函数调用。

Python dask 的并行数据处理实践

为什么 dask.delayed 比直接用 concurrent.futures 更适合数据流水线?

dask.delayed 不是简单地把函数扔进线程池,而是构建一个延迟执行的有向无环图(DAG),后续能做任务调度、重试、内存感知和跨节点分发。你写的是“做什么”,不是“怎么做”——这在处理多阶段 etl 时特别关键。

  • 如果你只是跑几个独立函数,concurrent.futures.ThreadPoolExecutor 更轻量、启动更快
  • 但只要涉及中间结果复用(比如 A → B → C,同时 B → D)、条件分支或部分失败重算,dask.delayed 的图能力立刻显出价值
  • 注意:所有被 @dask.delayed 装饰的函数,返回值会自动包装成 Delayed 对象;直接 print 或取值会触发计算,别在定义阶段就调 .compute()
@delayed def load_csv(path):     return pd.read_csv(path) <p>@delayed<br /> def clean(df): return df.dropna()</p><h1>这里没计算,只建图</h1><p>cleaned = clean(load_csv("data.csv"))</p>

dask.dataframe 读 CSV 卡住或内存暴涨?检查这三件事

dask.dataframe.read_csv 默认按行数切分块(blocksize),但实际切分依赖文件是否含换行符、压缩格式、是否有 header 行——这些都会让块大小失控,导致某一块巨长、其他块为空,甚至卡死在元数据探测阶段。

  • 确保文件是纯文本、LF 换行、无嵌入换行符的 CSV;如果用 excel 导出,先用 dos2unixpython 清洗一遍
  • 显式指定 blocksize="64MB"(别用字节硬算,用字符串"128MB"),并配合 sample=10000 控制 schema 推断采样行数
  • 遇到 OSError: [errno 22] Invalid argument,大概率是 windows 下路径含中文或 UNC 路径未转义,改用 r"serverpath" 或正斜杠

本地运行 dask.distributed.Client 反而比单线程慢?常见配置误用

开一个本地 Client(n_workers=4, threads_per_worker=1) 听起来合理,但默认会启用 dashboard(Web ui)、心跳检测、序列化/反序列化日志——对小数据集(

立即学习Python免费学习笔记(深入)”;

  • 小规模调试优先用 scheduler="threads"scheduler="synchronous",完全绕过调度器
  • 必须用 Client 时,关掉不需要的功能:dashboard_address=Nonesilence_logs=Logging.ERROR
  • Client 启动后默认连接 localhost:8786,如果端口被占,会静默 fallback 到随机端口——查 client.dashboard_link 才知道它到底在哪,别猜

dask.Array 处理图像却报 Array chunk size too large

dask.array 把大数组切块(chunks)来并行,但图像数据维度固定(如 (1000, 1024, 1024)),若 chunk 设置不当,容易生成单块超 1GB 的内存块,触发 ValueError

  • 别用 chunks=-1chunks=(1000, "auto", "auto")——”auto” 在高维下可能把第一维全塞进一块
  • 图像堆栈推荐按切片维度拆:如 chunks=(1, 512, 512),确保每块最多一张图的一部分
  • da.from_array(arr, chunks=(1, 512, 512)).persist() 替代直接计算,避免重复加载原始数据

事情说清了就结束。真正卡住的地方,往往不在代码怎么写,而在你默认相信的“自动行为”——比如 dask 怎么猜 CSV 分隔符、怎么选 chunk 大小、怎么处理缺失值传播——这些细节不盯住,图建得再漂亮也跑不起来。

text=ZqhQzanResources