如何高效并发处理嵌套结构的 API 请求(Python 多线程实践)

6次阅读

如何高效并发处理嵌套结构的 API 请求(Python 多线程实践)

本文教你使用 `concurrent.futures.threadpoolexecutor` 并发调用深层嵌套的 api(如逐个获取 award 详情),将串行耗时从数分钟降至数秒,兼顾代码简洁性与生产可用性。

在处理类似开放采购数据(如 Paraguay 公共合同 API)这类层级嵌套的 jsON 响应时,常见的串行请求模式——即外层遍历 records,内层遍历每个 compiledRelease.awards,再为每个 award.id 发起独立 http 请求——极易成为性能瓶颈。尤其当总量达数百个 compiledRelease、单个 awards 数组含数十甚至上百条记录时,同步阻塞式调用会导致总耗时呈线性增长(例如 500 次请求 × 平均 1.2s = 超 10 分钟),而网络 I/O 实际占用 CPU 极少,大量时间浪费在等待响应上。

此时,线程并发是 python 中最直接、稳定且无需修改业务逻辑的加速方案。concurrent.futures.ThreadPoolExecutor 正是为此类 I/O 密集型任务设计的标准库工具:它自动管理线程池、复用线程、支持结果收集与异常处理,且语法清晰易读。

以下是一个生产就绪的示例实现:

import requests from concurrent.futures import ThreadPoolExecutor, as_completed import time  # 示例 API 调用函数(请替换为你的真实函数) def api_call_function(award_id):     # 注意:真实场景中应添加超时、重试、错误码判断     try:         response = requests.get(f"https://api.example.com/awards/{award_id}", timeout=10)         response.raise_for_status()         return response.json()     except Exception as e:         return {"error": str(e), "award_id": award_id}  # 主处理函数 def fetch_all_award_details(records, max_workers=20):     """     并发获取所有 awards 的完整详情     :param records: 原始 JSON 中的 records 列表     :param max_workers: 线程池最大并发数(建议 10–30,避免被限流)     :return: 所有成功响应的列表(含 award_id 标识)     """     award_ids = []     # 第一步:扁平化提取全部 award ID(保留上下文信息可选)     for item in records:         compiled = item.get("compiledRelease", {})         awards = compiled.get("awards", [])         for award in awards:             award_id = award.get("id")             if award_id:                 award_ids.append(award_id)      print(f"共发现 {len(award_ids)} 个 award ID,启动并发请求...")      results = []     # 第二步:使用线程池并发执行     with ThreadPoolExecutor(max_workers=max_workers) as executor:         # 提交所有任务         future_to_id = {executor.submit(api_call_function, aid): aid for aid in award_ids}          # 按完成顺序收集结果(非提交顺序)         for future in as_completed(future_to_id):             award_id = future_to_id[future]             try:                 data = future.result()                 results.append({"award_id": award_id, "data": data})             except Exception as exc:                 results.append({"award_id": award_id, "error": str(exc)})      return results  # 使用示例 if __name__ == "__main__":     # 假设你已通过 requests 获取原始 records     # response = requests.get("https://api.example.com/records?limit=100")     # records = response.json().get("records", [])      # 这里用模拟数据演示结构     sample_records = [         {             "compiledRelease": {                 "awards": [                     {"id": "award-001"},                     {"id": "award-002"}                 ]             }         },         {             "compiledRelease": {                 "awards": [                     {"id": "award-003"},                     {"id": "award-004"},                     {"id": "award-005"}                 ]             }         }     ]      start_time = time.time()     all_details = fetch_all_award_details(sample_records, max_workers=10)     end_time = time.time()      print(f"n✅ 完成!共获取 {len(all_details)} 条响应,耗时 {end_time - start_time:.2f} 秒")     # 可进一步处理:筛选成功项、写入文件、入库等

? 关键注意事项与最佳实践:

立即学习Python免费学习笔记(深入)”;

  • 合理设置 max_workers:通常 10–30 是安全起点;过高可能触发服务端限流或本地端口耗尽;可通过小规模测试(如 50 个 ID)对比不同值的吞吐量来调优。
  • 务必添加超时与异常处理:HTTP 请求必须设 timeout(推荐 5–15 秒),否则单个失败请求会阻塞整个线程;as_completed() 确保不因某次失败中断整体流程。
  • 避免全局共享状态:线程间不要直接修改同一字典/列表;本例中每个 future.result() 返回独立数据,天然线程安全。
  • 替代方案说明:虽 asyncio + aiohttp 在理论吞吐上更高,但需全面重构异步风格(包括 HTTP 客户端、JSON 解析等),学习成本与维护复杂度显著上升;对绝大多数 API 场景,ThreadPoolExecutor 已足够高效且稳健。

通过以上改造,你的脚本可在数秒内完成原本需数分钟的批量请求,同时保持代码逻辑清晰、易于调试和扩展。

text=ZqhQzanResources