
本文教你使用 `concurrent.futures.threadpoolexecutor` 并发调用深层嵌套的 api(如逐个获取 award 详情),将串行耗时从数分钟降至数秒,兼顾代码简洁性与生产可用性。
在处理类似开放采购数据(如 Paraguay 公共合同 API)这类层级嵌套的 jsON 响应时,常见的串行请求模式——即外层遍历 records,内层遍历每个 compiledRelease.awards,再为每个 award.id 发起独立 http 请求——极易成为性能瓶颈。尤其当总量达数百个 compiledRelease、单个 awards 数组含数十甚至上百条记录时,同步阻塞式调用会导致总耗时呈线性增长(例如 500 次请求 × 平均 1.2s = 超 10 分钟),而网络 I/O 实际占用 CPU 极少,大量时间浪费在等待响应上。
此时,多线程并发是 python 中最直接、稳定且无需修改业务逻辑的加速方案。concurrent.futures.ThreadPoolExecutor 正是为此类 I/O 密集型任务设计的标准库工具:它自动管理线程池、复用线程、支持结果收集与异常处理,且语法清晰易读。
以下是一个生产就绪的示例实现:
import requests from concurrent.futures import ThreadPoolExecutor, as_completed import time # 示例 API 调用函数(请替换为你的真实函数) def api_call_function(award_id): # 注意:真实场景中应添加超时、重试、错误码判断 try: response = requests.get(f"https://api.example.com/awards/{award_id}", timeout=10) response.raise_for_status() return response.json() except Exception as e: return {"error": str(e), "award_id": award_id} # 主处理函数 def fetch_all_award_details(records, max_workers=20): """ 并发获取所有 awards 的完整详情 :param records: 原始 JSON 中的 records 列表 :param max_workers: 线程池最大并发数(建议 10–30,避免被限流) :return: 所有成功响应的列表(含 award_id 标识) """ award_ids = [] # 第一步:扁平化提取全部 award ID(保留上下文信息可选) for item in records: compiled = item.get("compiledRelease", {}) awards = compiled.get("awards", []) for award in awards: award_id = award.get("id") if award_id: award_ids.append(award_id) print(f"共发现 {len(award_ids)} 个 award ID,启动并发请求...") results = [] # 第二步:使用线程池并发执行 with ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有任务 future_to_id = {executor.submit(api_call_function, aid): aid for aid in award_ids} # 按完成顺序收集结果(非提交顺序) for future in as_completed(future_to_id): award_id = future_to_id[future] try: data = future.result() results.append({"award_id": award_id, "data": data}) except Exception as exc: results.append({"award_id": award_id, "error": str(exc)}) return results # 使用示例 if __name__ == "__main__": # 假设你已通过 requests 获取原始 records # response = requests.get("https://api.example.com/records?limit=100") # records = response.json().get("records", []) # 这里用模拟数据演示结构 sample_records = [ { "compiledRelease": { "awards": [ {"id": "award-001"}, {"id": "award-002"} ] } }, { "compiledRelease": { "awards": [ {"id": "award-003"}, {"id": "award-004"}, {"id": "award-005"} ] } } ] start_time = time.time() all_details = fetch_all_award_details(sample_records, max_workers=10) end_time = time.time() print(f"n✅ 完成!共获取 {len(all_details)} 条响应,耗时 {end_time - start_time:.2f} 秒") # 可进一步处理:筛选成功项、写入文件、入库等
? 关键注意事项与最佳实践:
立即学习“Python免费学习笔记(深入)”;
- 合理设置 max_workers:通常 10–30 是安全起点;过高可能触发服务端限流或本地端口耗尽;可通过小规模测试(如 50 个 ID)对比不同值的吞吐量来调优。
- 务必添加超时与异常处理:HTTP 请求必须设 timeout(推荐 5–15 秒),否则单个失败请求会阻塞整个线程;as_completed() 确保不因某次失败中断整体流程。
- 避免全局共享状态:线程间不要直接修改同一字典/列表;本例中每个 future.result() 返回独立数据,天然线程安全。
- 替代方案说明:虽 asyncio + aiohttp 在理论吞吐上更高,但需全面重构为异步风格(包括 HTTP 客户端、JSON 解析等),学习成本与维护复杂度显著上升;对绝大多数 API 场景,ThreadPoolExecutor 已足够高效且稳健。
通过以上改造,你的脚本可在数秒内完成原本需数分钟的批量请求,同时保持代码逻辑清晰、易于调试和扩展。