如何使用 Ajax 轮询 Celery 任务状态并实时更新 Django 前端

3次阅读

如何使用 Ajax 轮询 Celery 任务状态并实时更新 Django 前端

本文详解如何在 django 中结合 celery 异步任务与 ajax 轮询,实现数据库填充过程的实时进度反馈——包括后端状态判断、json 响应设计、前端递归轮询及 dom 动态更新。

在构建需长时间运行后台任务(如批量数据导入、报表生成)的 django 应用时,用户不应面对空白页或无响应等待。Celery 提供了强大的异步执行能力,而 ajax 轮询则是轻量、兼容性好且无需 websocket 基础设施的实时状态同步方案。本文将带你完整实现:启动 Celery 任务 → 前端持续轮询 → 根据任务状态(PROGRESS/SUCCESS/FAILURE)动态渲染进度或结果

✅ 后端:提供结构化状态接口

关键在于 current_data 视图必须返回可被前端明确解析的状态标识,而非仅依赖 AsyncResult.get()(该方法会阻塞直至完成,违背轮询本意)。应始终使用 async_result.state 判断当前状态,并按需返回阶段性数据(如已处理条数、最新统计摘要等),而非仅最终结果。

# views.py from celery import current_app from django.http import JsonResponse from django.shortcuts import render  def dashboard(request):     # 启动任务(确保 task 已配置 track_started=True 和 update_state())     result = prepare_database.delay()     return render(request, 'appname/template.html', {'task_id': result.id})  def current_data(request):     task_id = request.GET.get('task_id')     if not task_id:         return JsonResponse({'status': 'no_task'}, status=400)      async_result = current_app.AsyncResult(task_id)      # 根据 Celery 内置状态码返回结构化响应     if async_result.state == 'PENDING':         return JsonResponse({'status': 'pending', 'progress': 0})     elif async_result.state == 'PROGRESS':         # 假设你的 task 在 update_state() 中传递了 'processed' 字段         progress_data = async_result.info or {}         return JsonResponse({             'status': 'in_progress',             'progress': progress_data.get('processed', 0),             'message': progress_data.get('message', 'Processing...')         })     elif async_result.state == 'SUCCESS':         # 此处可返回最终结果摘要,或引导前端跳转/刷新         return JsonResponse({             'status': 'completed',             'data': async_result.result  # 或从 DB 查询最新数据         })     elif async_result.state in ['FAILURE', 'REVOKED']:         return JsonResponse({             'status': 'error',             'reason': str(async_result.info) if async_result.info else 'Task failed'         })     else:         return JsonResponse({'status': 'unknown', 'state': async_result.state})

⚠️ 注意事项: 确保 Celery Task 显式调用 self.update_state(state=’PROGRESS’, meta={…})(需在 @shared_task(bind=True) 下); AsyncResult.info 是 meta 参数的镜像,用于传递进度细节; 避免在 current_data 中调用 async_result.get(),它会阻塞请求线程,导致轮询积。

✅ 前端:递归轮询 + 智能 DOM 更新

前端使用 setTimeout 实现非阻塞递归轮询,并依据响应 status 字段执行不同逻辑:显示加载动画、更新进度条、插入实时数据或终止轮询。

<!-- template.html --> <div id="status-area">   <p id="status-text">Initializing...</p>   <div id="progress-bar" style="display:none;">     <progress id="progress" value="0" max="100"></progress>     <span id="progress-text">0%</span>   </div>   <div id="result-area" style="display:none;"></div> </div>  <script src="https://code.jquery.com/jquery-3.6.0.min.js"></script> <script> function pollTaskStatus(taskId) {   $.ajax({     url: '/current_data/',     method: 'GET',     data: { task_id: taskId },     dataType: 'json',     timeout: 10000, // 防止网络异常卡死     success: function(response) {       const $statusText = $('#status-text');       const $progressBar = $('#progress-bar');       const $progress = $('#progress');       const $progressText = $('#progress-text');       const $resultArea = $('#result-area');        switch(response.status) {         case 'pending':           $statusText.text('Task queued, waiting to start...');           $progressBar.hide();           $resultArea.hide();           setTimeout(() => pollTaskStatus(taskId), 2000);           break;          case 'in_progress':           $statusText.text(`Processing: ${response.message}`);           $progressBar.show();           // 示例:假设 processed 是 0~100 的百分比值           const pct = Math.min(100, response.progress || 0);           $progress.val(pct);           $progressText.text(`${pct}%`);           setTimeout(() => pollTaskStatus(taskId), 1500); // 动态调整间隔更优           break;          case 'completed':           $statusText.text('✅ Task completed!');           $progressBar.hide();           $resultArea.html(`<pre class="brush:php;toolbar:false;">${JSON.stringify(response.data, null, 2)}

`).show(); break; case ‘error’: $statusText.html(`❌ Error: ${response.reason}`); $progressBar.hide(); $resultArea.hide(); break; default: $statusText.text(`Unknown status: ${response.status}`); } }, error: function(xhr, status, error) { if (status === ‘timeout’) { $statusText.text(‘⚠️ Request timed out. Retrying…’); } else { $statusText.text(`Network error: ${error}`); } setTimeout(() => pollTaskStatus(taskId), 3000); } }); } $(document).ready(function() { const taskId = ‘{{ task_id }}’; if (taskId) { pollTaskStatus(taskId); } });

? 总结与进阶建议

  • 轮询不是银弹:高频轮询(
  • 状态一致性:Celery 的 PROGRESS 状态需配合 update_state() 主动上报,否则前端永远收不到中间态。
  • 用户体验优化:添加加载骨架屏、禁用重复提交按钮、提供手动刷新入口,让交互更健壮。
  • 安全加固:在 current_data 中校验 task_id 来源(如关联用户 session)、限制单 IP 轮询频率,防止滥用。

通过以上实现,你已构建出一套简洁、可靠、可扩展的 Celery 任务状态同步方案——无需复杂基础设施,即可为用户提供清晰、及时的任务可视化体验。

立即学习前端免费学习笔记(深入)”;

text=ZqhQzanResources