高效实现带去重逻辑的滚动均值计算(面试题解析与优化方案)

6次阅读

高效实现带去重逻辑的滚动均值计算(面试题解析与优化方案)

本文详解如何在时间序列数据中按时间点动态计算滚动均值,同时确保每个名称仅保留最新一次出现的值参与计算,兼顾算法效率与代码可读性。

本文详解如何在时间序列数据中按时间点动态计算滚动均值,同时确保每个名称仅保留最新一次出现的值参与计算,兼顾算法效率与代码可读性。

数据分析与算法面试中,“带条件的滚动统计”是一类典型问题——它不仅考察对基础聚合操作的理解,更检验候选人对数据去重逻辑、窗口扩展策略及时间复杂度控制的综合能力。本题要求:对按时间排序的数据,为每个时间点 t 计算“截至 t 的所有记录中,每个 name 仅取其最后一次出现的 val 值”后的均值”。关键约束在于:不能重复全量遍历历史数据(即避免 O(n²) 复杂度),需设计增量或准增量式解法**。

以下提供两种专业级实现方案,均基于 pandas(主流且易验证),但思路可轻松迁移到纯 Python 或 sql 环境:

✅ 方案一:分组 + 累积去重(推荐|清晰高效)

核心思想:先按 time 分组,对每组内 names 做 last 去重 → 再对各时间点前缀(含当前)的所有去重结果合并 → 最后按时间点聚合均值。

import pandas as pd  data = pd.DataFrame({     'time': [1, 1, 1, 2, 2, 2],     'names': ["Andy", "Bob", "Karen", "Andy", "Matt", "Sim"],     'val': [1, 2, 3, 5, 6, 8] })  # 步骤1:对每个 time 组内,按 names 保留最后出现的 val(隐含时间先后顺序) # 注意:若原始数据未按 time 排序,务必先 sort_values(['time', 'names'], kind='stable') grouped_last = data.groupby('time').apply(     lambda g: g.drop_duplicates(subset='names', keep='last')[['names', 'val']] ).reset_index(drop=True)  # 步骤2:构建“截至每个 time”的累积视图(模拟滚动窗口) cumulative_records = [] seen_names = set() for t in sorted(data['time'].unique()):     # 取出 time <= t 的所有记录,并按 names 逆序去重(保证最新覆盖旧值)     window = data[data['time'] <= t].sort_values('time', ascending=True)     # 关键:drop_duplicates(keep='last') 在已排序的 window 中等价于取每个 name 的最新 val     latest_in_window = window.drop_duplicates(subset='names', keep='last')     cumulative_records.append(latest_in_window)  # 步骤3:合并并计算各 time 点均值 result_df = pd.concat(cumulative_records, ignore_index=True) means = result_df.groupby('time')['val'].mean().to_dict()  print(means)  # {1: 2.0, 2: 4.8}

✅ 方案二:字典状态维护(极致高效|O(n) 时间复杂度)

适用于大数据流或内存敏感场景。用字典 latest_vals 动态追踪每个 name 的最新 val,遍历时间点时实时更新并累加均值:

def rolling_mean_no_duplicate_names(df):     df_sorted = df.sort_values('time')  # 必须保证时间有序     latest_vals = {}  # name -> latest val     means = {}      for _, row in df_sorted.iterrows():         # 更新该 name 的最新值         latest_vals[row['names']] = row['val']         # 当前时间点的所有最新值均值         current_mean = sum(latest_vals.values()) / len(latest_vals)         means[row['time']] = round(current_mean, 1)  # 可选精度控制      return means  print(rolling_mean_no_duplicate_names(data))  # {1: 2.0, 2: 4.8}

⚠️ 注意事项与进阶提示

  • 数据顺序至关重要:drop_duplicates(keep=’last’) 依赖行序。若原始数据中同一 time 内 names 出现顺序不反映业务时效性,需额外定义排序键(如添加 timestamp 列)。
  • 空值处理:实际场景中需检查 val 是否为 NaN,建议在 .mean() 前添加 dropna=True。
  • 扩展性思考:若需支持“最近 N 次”而非“最后一次”,可改用 collections.deque 维护每个 name 的滑动值队列。
  • 面试表达重点:优先说明方案二的时间复杂度优势(单次遍历 O(n)),再补充方案一的可读性与可调试性;强调“状态维护”是解决此类滚动+去重问题的核心范式。

掌握这种“滚动窗口 + 键级状态更新”的建模思维,不仅能应对类似面试题,更是构建实时特征工程管道的关键能力。

text=ZqhQzanResources