Pandas 中实现带最小窗口约束与条件重置的滚动均值指示器

10次阅读

Pandas 中实现带最小窗口约束与条件重置的滚动均值指示器

本文介绍如何为 pandes dataframe 的每列构建一个动态符号指示器(1 或 -1),该指示器在当前值 ≥ 指定倍数 × 当前滚动均值时翻转,并强制要求每次重置前滚动窗口至少包含指定最小观测数。

在时间序列分析或信号检测类任务中,常需根据局部统计量(如滚动均值)的动态变化触发状态切换。本教程解决一个典型场景:对每一列独立维护一个累积滚动均值窗口,当某行值满足 value ≥ multiple × current_rolling_mean 且窗口长度已达最小阈值(min_count)时,重置窗口并翻转指示器符号(+1 ↔ −1)。该逻辑无法直接通过 pd.Series.rolling().mean() 实现,因其依赖状态记忆(当前符号、累计和、计数),需逐行迭代处理。

以下是一个清晰、可复用的纯 python 实现:

import pandas as pd  def rolling_mean_indicator(col, start=1, multiple=2, min_count=4):     """     生成滚动均值触发的符号指示器序列。      Parameters:     -----------     col : pd.Series or array-like         输入列数据     start : int, default 1         初始指示器值(1 或 -1)     multiple : float, default 2         触发翻转的倍数阈值     min_count : int, default 4         允许触发重置所需的最小连续观测数      Yields:     -------     int : 当前行对应的指示器值(1 或 -1)     """     curr = start     num_obs = 0     acc = 0.0      for v in col:         acc += v         num_obs += 1          if num_obs < min_count:             yield curr             continue          mean_val = acc / num_obs         if v >= multiple * mean_val:             curr *= -1             num_obs = 0             acc = 0.0          yield curr  # 示例数据 df = pd.DataFrame({     "A": [0.1, 0.1, 0.15, 0.1, 0.1, 0.7, 0.1, 0.1, 0.5, 1, 0.1, 0.1],     "B": [0.1, 0.1, 0.4, 0.1, 0.8, 0.1, 0.1, 0.1, 0.1, 0.1, 0.9, 0.1], })  # 应用函数 df["A_ind"] = list(rolling_mean_indicator(df["A"])) df["B_ind"] = list(rolling_mean_indicator(df["B"]))  print(df[["A", "B", "A_ind", "B_ind"]])

输出结果与预期一致:

A    B  A_ind  B_ind 0  0.10  0.1      1      1 1  0.10  0.1      1      1 2  0.15  0.4      1      1 3  0.10  0.1      1      1 4  0.10  0.8      1     -1 5  0.70  0.1     -1     -1 6  0.10  0.1     -1     -1 7  0.10  0.1     -1     -1 8  0.50  0.1     -1     -1 9  1.00  0.1      1     -1 10 0.10  0.9      1      1 11 0.10  0.1      1      1

关键逻辑说明:

  • 窗口是累积型(非滑动窗):从上一次重置后首行开始累加,直到触发条件才清空;
  • min_count 是硬性约束:即使满足 v ≥ multiple × mean,若 num_obs
  • 重置即清零 acc 和 num_obs,下一行重新开始累积(而非跳过);
  • 指示器仅在重置发生时翻转,其余时间保持当前值。

性能优化建议(大数据集必选):
对万行以上数据,推荐使用 numba JIT 加速。只需添加 @njit 装饰器并返回 numpy 数组:

from numba import njit import numpy as np  @njit def rolling_mean_indicator_numba(col, start=1, multiple=2, min_count=4):     curr = start     num_obs = 0     acc = 0.0     out = np.empty(len(col), dtype=np.int8)      for i in range(len(col)):         v = col[i]         acc += v         num_obs += 1          if num_obs < min_count:             out[i] = curr             continue          mean_val = acc / num_obs         if v >= multiple * mean_val:             curr *= -1             num_obs = 0             acc = 0.0          out[i] = curr      return out  # 使用方式(注意传入 .values) df["A_ind_fast"] = rolling_mean_indicator_numba(df["A"].values) df["B_ind_fast"] = rolling_mean_indicator_numba(df["B"].values)

此方案兼顾可读性与工程实用性,适用于金融信号生成、异常脉冲检测、自适应阈值控制系统等场景。注意:该逻辑本质为在线单次遍历算法,不支持向量化回溯,但正因如此,它天然适配流式数据处理。

text=ZqhQzanResources