如何高效分块处理并排序超大CSV文件(Python教程)

1次阅读

如何高效分块处理并排序超大CSV文件(Python教程)

本文介绍使用python分块读取超大csv文件的方法,避免内存溢出,并通过fileinput模块逐行解析、按需切分字段,结合用户输入实现灵活筛选与排序。

在处理GB级csv文件时,一次性加载全部数据到内存(如用pandas.read_csv()或csv.reader配合list())极易导致内存耗尽或程序崩溃。此时,流式分块处理是更稳健的选择——不追求“一次性分割为4个物理文件”,而是逻辑上按字段维度切分关注区域,并逐行解析、即时分类、动态聚合,从而支撑后续排序与查询。

以下是一个专业、可扩展的实现方案:

✅ 推荐方案:使用 fileinput 流式读取 + 字段映射 + 动态分组

import fileinput from collections import defaultdict import operator  def load_and_partition_by_key(filepath, key_field=0, encoding="utf-8"):     """     按指定列(如 country/abv/year)流式分组数据,返回字典:{key: [(country, abv, year, expectancy), ...]}     key_field: 0=country, 1=abv, 2=year, 3=expectancy(支持任意列作为逻辑“部分”)     """     partitions = defaultdict(list)      with fileinput.input(filepath, encoding=encoding) as f:         for line_num, line in enumerate(f, 1):             line = line.strip()             if not line:  # 跳过空行                 continue             try:                 parts = line.split(",")                 # 安全提取并类型转换                 country = parts[0].strip('"' ')                 abv = parts[1].strip('"' ')                 year = int(parts[2])                 expectancy = float(parts[3])                  # 按用户指定字段分组(例如:按国家分组 → 一个“part”对应一个国家的所有年份记录)                 key = parts[key_field].strip('"' ') if key_field < len(parts) else "unknown"                 partitions[key].append((country, abv, year, expectancy))              except (ValueError, IndexError) as e:                 print(f"⚠️ 警告:第 {line_num} 行格式异常,已跳过 — {e}")                 continue      return partitions  # 示例:按国家分4个典型部分(取前4个非空国家名),并分别排序 if __name__ == "__main__":     data_by_country = load_and_partition_by_key("life_expectancy.csv", key_field=0)      # 用户输入目标国家(模拟交互式筛选)     target_countries = ["China", "India", "Brazil", "Nigeria"]  # 或由 input() 获取     for country in target_countries:         if country in data_by_country:             # 对该国数据按年份升序排序             sorted_by_year = sorted(data_by_country[country], key=operator.itemgetter(2))             print(f"n? {country}(共{len(sorted_by_year)}条记录):")             for record in sorted_by_year[:3]:  # 仅显示前3年示例                 print(f"  {record[2]}年: {record[3]:.2f}岁")         else:             print(f"? {country} 数据未找到")

? 关键优势说明:

  • 零内存压力:fileinput 逐行读取,每行仅占用极小内存;
  • 弹性“分块”:所谓“分成4部分”,本质是按业务维度(国家/年份/缩写/指标)逻辑聚类,而非物理拆分文件;
  • 强健容错:自动跳过空行、异常行,并提示错误位置;
  • 即用即排:每个逻辑分组(如某国所有年份)可独立调用 sorted() 或 heapq.nsmallest() 实现高效排序;
  • 无缝扩展:后续可轻松接入 pandas.DataFrame(对单个分组)、写入新CSV,或构建内存索引。

⚠️ 注意事项:

  • 确保CSV无嵌套逗号(否则需用 csv 模块替代 str.split(","));
  • 若需严格按物理文件切分(如将原文件拆成4个子文件),应使用 itertools.islice 配合 open() 分段写入,但通常不推荐——它无法保证语义完整性(如切断某国连续年份);
  • 中文路径或特殊字符请始终显式指定 encoding="utf-8-sig" 防止乱码。

掌握这种“流式解析 + 键值分组 + 懒排序”范式,你不仅能应对千万行CSV,还能为构建实时分析管道打下坚实基础。

text=ZqhQzanResources