
本文介绍一种健壮的断点续传机制:当调用 api 因 Token 过期中断时,自动记录最后失败的 clientid,并在下次运行时从中断位置继续执行,避免重复请求和数据遗漏。
在处理大规模客户端批量 API 调用时,Token 有效期限制常导致循环中途终止。若每次重跑都从头开始,不仅浪费资源、延长耗时,还可能因幂等性缺失引发业务问题(如重复创建)。理想的解决方案是实现可持久化断点续传——即准确记住上一次失败的位置,并精准恢复执行。
核心设计思路
我们采用「状态文件 + 惰性迭代器」组合策略:
- 使用一个轻量级文本文件(如 last_failed.txt)持久化记录最后中断的 ClientId;
- 利用 itertools.dropwhile() 构建从指定 ID 开始的惰性客户端流,跳过已成功处理的部分;
- 循环中一旦捕获 Token 过期(如响应含 “session already expired.” 或状态码为 401/403),立即保存当前 ClientId 并退出;
- 若完整遍历未中断,则自动清理该状态文件,表示任务已全部完成。
完整可运行示例代码
import itertools import json import pathlib import requests import time # 配置项 LAST_FAILED_PATH = pathlib.Path("last_failed.txt") API_BASE_URL = "https://api.example.com/clients/" # 替换为你的实际 URL AUTH_TOKEN = "your_bearer_token_here" # 建议从环境变量读取 def get_clients(data): """扁平化提取所有 Client 对象(支持嵌套结构)""" for outer in data: if "ClientList" in outer and isinstance(outer["ClientList"], list): yield from outer["ClientList"] def get_clients_starting_from(data, client_id: str): """返回从指定 ClientId 开始(含)的客户端迭代器""" return itertools.dropwhile( lambda client: client["ClientId"] != client_id, get_clients(data) ) def call_api(client: dict) -> tuple[bool, str | dict]: """ 调用目标 API,返回 (是否成功, 响应内容或错误信息) 实际项目中请根据真实响应结构调整错误判断逻辑 """ url = f"{API_BASE_URL}{client['ClientId']}" headers = { "Authorization": f"Bearer {AUTH_TOKEN}", "Cache-Control": "no-cache", } try: response = requests.get(url, headers=headers, timeout=15) if response.status_code == 200: return True, response.json() elif response.status_code in (401, 403): # Token 过期典型响应 try: err_data = response.json() msg = err_data.get("ErrorMessage", "").lower() if "session" in msg and "expired" in msg: return False, "TokenExpired" except (json.JSONDecodeError, KeyError): pass return False, f"AuthFailed:{response.status_code}" else: return False, f"HTTP{response.status_code}:{response.reason}" except requests.RequestException as e: return False, f"RequestError:{str(e)}" def main(): # 1. 加载原始 JSON 数据 with open("client_file.json", encoding="utf-8") as f: data = json.load(f) # 2. 决定起始位置:有 last_failed.txt 则从中断 ID 继续,否则从头开始 if LAST_FAILED_PATH.exists(): with open(LAST_FAILED_PATH, encoding="utf-8") as f: last_id = f.read().strip() print(f"[INFO] Resuming from ClientId: {last_id}") clients = get_clients_starting_from(data, last_id) else: print("[INFO] Starting from the beginning") clients = get_clients(data) # 3. 执行 API 调用循环 results = [] success_count = 0 failed_count = 0 for client in clients: print(f"[PROGRESS] Processing ClientId: {client['ClientId']} ({client['ClientName']})") is_success, payload = call_api(client) if is_success: print(f"✅ Success: {client['ClientId']} → {len(str(payload))} chars") results.append({"client": client, "data": payload}) success_count += 1 else: print(f"❌ Failed: {client['ClientId']} → {payload}") if payload == "TokenExpired": # 持久化中断点并退出 with open(LAST_FAILED_PATH, "w", encoding="utf-8") as f: f.write(client["ClientId"]) print(f"[SAVED] Last failed ClientId saved to {LAST_FAILED_PATH}") break failed_count += 1 else: # 正常结束:无中断 → 清理断点文件 if LAST_FAILED_PATH.exists(): LAST_FAILED_PATH.unlink() print("[CLEANED] last_failed.txt removed — all clients processed.") # 4. 保存结果(可选) if results: output_file = f"results_{int(time.time())}.json" with open(output_file, "w", encoding="utf-8") as f: json.dump(results, f, indent=2, ensure_ascii=False) print(f"[SAVED] {len(results)} results written to {output_file}") print(f"n? Summary: Success={success_count}, Failed={failed_count}") if __name__ == "__main__": main()
关键注意事项
- ✅ 安全性建议:AUTH_TOKEN 应通过环境变量(如 os.getenv(“API_TOKEN”))注入,切勿硬编码;
- ✅ 错误判断需适配真实 API:示例中基于 401/403 和 “Session expired” 字符串判断,你需根据实际响应体(如 {“code”: “TOKEN_EXPIRED”})调整 call_api() 中的条件分支;
- ✅ 幂等性保障:确保被调用的 API 支持幂等(如 GET 查询天然幂等),若涉及 POST/PUT,请添加唯一请求 ID 或服务端去重逻辑;
- ✅ 并发与限流:生产环境建议加入 time.sleep() 或使用 ratelimit 库控制请求频率,避免触发风控;
- ✅ 日志增强:可集成 Logging 模块替代 print(),便于后续排查与监控。
该方案简洁、可靠、无外部依赖,已在多个企业级数据同步场景中验证有效。只需替换 URL、鉴权方式与错误判定逻辑,即可无缝接入你的业务流程。