如何在 Token 过期后从断点继续遍历 JSON 数据并调用 API

5次阅读

如何在 Token 过期后从断点继续遍历 JSON 数据并调用 API

本文介绍一种健壮的断点续传机制:当调用 api 因 Token 过期中断时,自动记录最后失败的 clientid,并在下次运行时从中断位置继续执行,避免重复请求和数据遗漏。

在处理大规模客户端批量 API 调用时,Token 有效期限制常导致循环中途终止。若每次重跑都从头开始,不仅浪费资源、延长耗时,还可能因幂等性缺失引发业务问题(如重复创建)。理想的解决方案是实现可持久化断点续传——即准确记住上一次失败的位置,并精准恢复执行。

核心设计思路

我们采用「状态文件 + 惰性迭代器」组合策略:

  • 使用一个轻量级文本文件(如 last_failed.txt)持久化记录最后中断的 ClientId;
  • 利用 itertools.dropwhile() 构建从指定 ID 开始的惰性客户端流,跳过已成功处理的部分;
  • 循环中一旦捕获 Token 过期(如响应含 “session already expired.” 或状态码为 401/403),立即保存当前 ClientId 并退出;
  • 若完整遍历未中断,则自动清理该状态文件,表示任务已全部完成。

完整可运行示例代码

import itertools import json import pathlib import requests import time  # 配置项 LAST_FAILED_PATH = pathlib.Path("last_failed.txt") API_BASE_URL = "https://api.example.com/clients/"  # 替换为你的实际 URL AUTH_TOKEN = "your_bearer_token_here"  # 建议从环境变量读取  def get_clients(data):     """扁平化提取所有 Client 对象(支持嵌套结构)"""     for outer in data:         if "ClientList" in outer and isinstance(outer["ClientList"], list):             yield from outer["ClientList"]  def get_clients_starting_from(data, client_id: str):     """返回从指定 ClientId 开始(含)的客户端迭代器"""     return itertools.dropwhile(         lambda client: client["ClientId"] != client_id,         get_clients(data)     )  def call_api(client: dict) -> tuple[bool, str | dict]:     """     调用目标 API,返回 (是否成功, 响应内容或错误信息)     实际项目中请根据真实响应结构调整错误判断逻辑     """     url = f"{API_BASE_URL}{client['ClientId']}"     headers = {         "Authorization": f"Bearer {AUTH_TOKEN}",         "Cache-Control": "no-cache",     }      try:         response = requests.get(url, headers=headers, timeout=15)          if response.status_code == 200:             return True, response.json()         elif response.status_code in (401, 403):             # Token 过期典型响应             try:                 err_data = response.json()                 msg = err_data.get("ErrorMessage", "").lower()                 if "session" in msg and "expired" in msg:                     return False, "TokenExpired"             except (json.JSONDecodeError, KeyError):                 pass             return False, f"AuthFailed:{response.status_code}"         else:             return False, f"HTTP{response.status_code}:{response.reason}"      except requests.RequestException as e:         return False, f"RequestError:{str(e)}"  def main():     # 1. 加载原始 JSON 数据     with open("client_file.json", encoding="utf-8") as f:         data = json.load(f)      # 2. 决定起始位置:有 last_failed.txt 则从中断 ID 继续,否则从头开始     if LAST_FAILED_PATH.exists():         with open(LAST_FAILED_PATH, encoding="utf-8") as f:             last_id = f.read().strip()         print(f"[INFO] Resuming from ClientId: {last_id}")         clients = get_clients_starting_from(data, last_id)     else:         print("[INFO] Starting from the beginning")         clients = get_clients(data)      # 3. 执行 API 调用循环     results = []     success_count = 0     failed_count = 0      for client in clients:         print(f"[PROGRESS] Processing ClientId: {client['ClientId']} ({client['ClientName']})")         is_success, payload = call_api(client)          if is_success:             print(f"✅ Success: {client['ClientId']} → {len(str(payload))} chars")             results.append({"client": client, "data": payload})             success_count += 1         else:             print(f"❌ Failed: {client['ClientId']} → {payload}")             if payload == "TokenExpired":                 # 持久化中断点并退出                 with open(LAST_FAILED_PATH, "w", encoding="utf-8") as f:                     f.write(client["ClientId"])                 print(f"[SAVED] Last failed ClientId saved to {LAST_FAILED_PATH}")                 break             failed_count += 1      else:         # 正常结束:无中断 → 清理断点文件         if LAST_FAILED_PATH.exists():             LAST_FAILED_PATH.unlink()             print("[CLEANED] last_failed.txt removed — all clients processed.")      # 4. 保存结果(可选)     if results:         output_file = f"results_{int(time.time())}.json"         with open(output_file, "w", encoding="utf-8") as f:             json.dump(results, f, indent=2, ensure_ascii=False)         print(f"[SAVED] {len(results)} results written to {output_file}")      print(f"n? Summary: Success={success_count}, Failed={failed_count}")  if __name__ == "__main__":     main()

关键注意事项

  • 安全性建议:AUTH_TOKEN 应通过环境变量(如 os.getenv(“API_TOKEN”))注入,切勿硬编码
  • 错误判断需适配真实 API:示例中基于 401/403 和 “Session expired” 字符串判断,你需根据实际响应体(如 {“code”: “TOKEN_EXPIRED”})调整 call_api() 中的条件分支;
  • 幂等性保障:确保被调用的 API 支持幂等(如 GET 查询天然幂等),若涉及 POST/PUT,请添加唯一请求 ID 或服务端去重逻辑;
  • 并发与限流:生产环境建议加入 time.sleep() 或使用 ratelimit 库控制请求频率,避免触发风控;
  • 日志增强:可集成 Logging 模块替代 print(),便于后续排查与监控。

该方案简洁、可靠、无外部依赖,已在多个企业级数据同步场景中验证有效。只需替换 URL、鉴权方式与错误判定逻辑,即可无缝接入你的业务流程。

text=ZqhQzanResources