如何在 Token 过期后从中断位置恢复 JSON 迭代任务

1次阅读

如何在 Token 过期后从中断位置恢复 JSON 迭代任务

本文介绍一种健壮的断点续传机制:当调用 api 过程中因 Token 失效中断时,自动记录最后失败的 clientid,并在下次运行时从此处继续迭代,避免重复请求和数据丢失

在批量处理 jsON 数据并逐个调用外部 API 的场景中(如按 ClientId 查询客户信息),Token 有效期限制常导致任务中途失败。若每次失败后都从头重跑,不仅浪费资源、延长执行时间,还可能引发接口限流或数据重复写入等问题。理想的解决方案是实现可持久化状态的断点续传——即准确记住上一次中断的位置,并精准恢复。

核心设计思路

我们不依赖内存中的索引变量(易丢失),而是采用外部状态文件(如 last_failed.txt)持久化记录最后一次失败的 ClientId。程序启动时优先检查该文件:

  • 若文件不存在 → 从第一个 Client 开始遍历;
  • 若文件存在 → 从文件中记录的 ClientId 开始(含) 继续遍历,跳过此前已成功处理的部分。

该策略的关键在于:“从指定 ClientId 恢复” ≠ “跳过该 Client”,而是包含它——因为上一次正是在此 Client 触发了 Token 过期,需重新尝试该请求。

实现要点与代码详解

以下是一个生产就绪的参考实现(已适配原 json 结构):

import json import pathlib import requests import time  # ✅ 配置项:请根据实际环境修改 API_BASE_URL = "https://api.example.com/clients/"  # 示例 URL 模板 AUTH_TOKEN = "your_bearer_token_here" LAST_FAILED_PATH = pathlib.Path("last_failed.txt")  def load_clients_from_json(filepath: str):     """安全加载并扁平化解析 ClientList 数组"""     with open(filepath, encoding="utf-8") as f:         data = json.load(f)     clients = []     for item in data:         if "ClientList" in item and isinstance(item["ClientList"], list):             clients.extend(item["ClientList"])     return clients  def find_client_start_index(clients: list, target_id: str) -> int:     """线性查找目标 ClientId 的索引(支持重复 ID 场景)"""     for i, client in enumerate(clients):         if client.get("ClientId") == target_id:             return i     raise ValueError(f"ClientId '{target_id}' not found in client list")  def call_api_with_retry(client: dict) -> tuple[bool, str]:     """封装 API 调用逻辑,返回 (是否成功, 响应文本/错误信息)"""     url = f"{API_BASE_URL}{client['ClientId']}"     headers = {         "Authorization": f"Bearer {AUTH_TOKEN}",         "Cache-Control": "no-cache",     }      try:         response = requests.get(url, headers=headers, timeout=30)          if response.status_code == 200:             return True, response.text         elif response.status_code == 401 or response.status_code == 403:             # ⚠️ 典型 Token 过期响应码(依实际 API 调整)             err_data = response.json()             if "expired" in err_data.get("message", "").lower() or "invalid token" in err_data.get("Error", "").lower():                 return False, "Token expired"         # 其他错误(如 500、404)视为业务失败,不中断流程         return False, f"HTTP {response.status_code}: {response.text[:200]}"      except requests.exceptions.RequestException as e:         return False, f"Request failed: {str(e)}"     except json.JSONDecodeError:         return False, f"Invalid JSON response: {response.text[:200]}"  def main():     # Step 1: 加载所有客户端数据     clients = load_clients_from_json("client_file.json")     if not clients:         print("⚠️  Warning: No clients found in JSON file.")         return      # Step 2: 确定起始位置     start_idx = 0     if LAST_FAILED_PATH.exists():         try:             with open(LAST_FAILED_PATH) as f:                 last_id = f.read().strip()             start_idx = find_client_start_index(clients, last_id)             print(f"▶️  Resuming from ClientId: {last_id} (index {start_idx})")         except (ValueError, FileNotFoundError) as e:             print(f"⚠️  Failed to resume: {e}. Starting from beginning.")             LAST_FAILED_PATH.unlink(missing_ok=True)      # Step 3: 迭代调用 API     success_count = 0     failed_count = 0     start_time = time.time()      for i in range(start_idx, len(clients)):         client = clients[i]         client_id = client.get("ClientId", "N/A")         print(f"n? Processing ClientId: {client_id}")          is_success, result = call_api_with_retry(client)          if is_success:             success_count += 1             print(f"✅ Success | Response length: {len(result)} chars")         else:             failed_count += 1             print(f"❌ Failed | Reason: {result}")              # ? 关键:仅当 Token 过期时才保存断点并退出             if "Token expired" in result:                 print(f"? Token expired at ClientId '{client_id}'. Saving checkpoint...")                 with open(LAST_FAILED_PATH, "w") as f:                     f.write(client_id)                 print("⏹️  Execution paused. Rerun script to resume.")                 break     else:         # ✅ 正常完成全部迭代:清除断点文件         if LAST_FAILED_PATH.exists():             LAST_FAILED_PATH.unlink()             print("✅ All clients processed successfully. Checkpoint cleared.")      # Step 4: 输出统计摘要     elapsed = time.time() - start_time     print(f"n? Summary: {success_count} success, {failed_count} failed | Time: {elapsed:.2f}s")  if __name__ == "__main__":     main()

注意事项与最佳实践

  • Token 过期判定要精准:不同 API 返回的过期标识各异(如 401 Unauthorized、{“error”:”invalid_token”}、”session expired“)。务必根据实际响应调整 call_api_with_retry() 中的判断逻辑,避免误判导致意外中断。
  • 状态文件路径需可写:确保脚本对 last_failed.txt 所在目录有读写权限;生产环境建议使用绝对路径或配置化存储位置。
  • 幂等性保障:API 设计应支持重复请求(如 GET 查询类接口天然幂等)。若为 POST/PUT 类操作,请在服务端增加幂等 Key 或客户端做去重校验。
  • 异常兜底:示例中对 JSON 解析失败、网络超时等做了基础捕获。在关键业务中,建议增加日志框架(如 Logging)记录详细上下文,并接入告警。
  • 性能优化(可选):若 ClientList 极大(数万+),可将 find_client_start_index 替换为哈希映射预构建索引,加速定位。

总结

通过引入轻量级外部状态文件 + 清晰的起始索引定位逻辑,我们实现了稳定、可预测的断点续传能力。该方案无需数据库、不依赖外部服务,兼容性强,且易于集成到定时任务(如 cron)或 CI/CD 流水线中。记住核心原则:失败即存档,启动先检查,恢复即重试——让批量 API 调用真正具备韧性与可维护性。

text=ZqhQzanResources