Pandas高效查找历史条件匹配的最新索引:Bisect方法详解

Pandas高效查找历史条件匹配的最新索引:Bisect方法详解

本文旨在探讨在pandas dataframe中,如何高效地查找满足特定特定条件的历史最新索引。针对传统apply方法在处理此类依赖于过去状态的问题时性能瓶颈,我们将介绍并详细分析基于python内置bisect模块的优化方案,该方案通过结合二分查找和哈希表,显著提升了处理大规模数据集的效率,并提供了详细的代码实现与性能对比。

1. 问题背景与低效方案分析

数据分析中,我们经常需要根据当前行的数据,回溯查找历史上满足特定条件的最新记录。例如,给定一个DataFrame,其中包含lower和upper两列以及一个时间索引date,我们的目标是为每一行查找其之前所有行中,lower值大于或等于当前行upper值的最新DATE索引。

以下是一个典型的示例DataFrame及其初始的低效实现:

import pandas as pd import numpy as np  # 示例DataFrame data = {'lower': [7, 1, 6, 1, 1, 1, 1, 11, 1, 1],         'upper': [2, 3, 4, 5, 6, 7, 8, 9, 10, 11]} df = pd.DataFrame(data=data) df['DATE'] = pd.date_range('2020-01-01', periods=len(data['lower'])) df.set_index('DATE', inplace=True)  print("原始DataFrame:") print(df)  # 低效方案:使用 df.apply def get_most_recent_index_baseline(row, dataframe):     # 查找当前行之前的所有行     # 注意:row.name - pd.Timedelta(minutes=1) 确保只考虑严格早于当前行的记录     previous_indices = dataframe.loc[:row.name - pd.Timedelta(minutes=1)]       # 筛选满足条件的记录,并返回最新的索引     recent_index = previous_indices[previous_indices['lower'] >= row['upper']].index.max()     return recent_index  # 应用函数到每一行 # df['prev_baseline'] = df.apply(lambda row: get_most_recent_index_baseline(row, df), axis=1)  # print("n低效方案结果:") # print(df)

低效原因分析:

上述df.apply结合自定义函数的方案虽然直观,但效率极低,主要原因如下:

  1. 行迭代开销: df.apply(axis=1)本质上是对DataFrame进行行迭代,这在Pandas中通常是性能瓶颈
  2. 重复切片与筛选: 在每次迭代中,dataframe.loc[:row.name – pd.Timedelta(minutes=1)]都会对DataFrame进行切片操作,并随后进行条件筛选。随着DataFrame的增大,切片和筛选的开销会显著增加。
  3. 时间复杂度: 对于每一行,它可能需要扫描其之前的所有行。这意味着整体时间复杂度接近O(N^2),对于大规模数据集是不可接受的。

在实际性能测试中,对于包含10万行数据的DataFrame,这种基线方案可能需要数分钟甚至更长时间才能完成。

2. 高效解决方案:使用bisect模块

为了解决上述性能问题,我们可以利用python内置的bisect模块进行二分查找,结合哈希表来优化查找过程。bisect模块提供了一组函数,用于在有序序列中插入元素或查找元素的位置,其时间复杂度为O(log N)。

以下是基于bisect模块的优化方案实现:

from bisect import bisect_left  def get_prev_with_bisect(lower_series, upper_series, date_index):     """     使用bisect模块高效查找满足条件的历史最新索引。      参数:     lower_series (pd.Series): DataFrame的'lower'列。     upper_series (pd.Series): DataFrame的'upper'列。     date_index (pd.DatetimeIndex): DataFrame的时间索引。      返回:     list: 包含每行对应的历史最新索引的列表。     """     # 获取所有不重复的lower值并排序,用于二分查找     uniq_lower = sorted(set(lower_series))     # last_seen 字典用于存储每个lower值最近出现的日期     # 键为lower值,值为对应的最新日期     last_seen = {}      results = []      # 遍历每一行数据     for l, u, d in zip(lower_series, upper_series, date_index):         max_date = None          # 使用bisect_left查找在uniq_lower中,第一个大于或等于当前u的元素的索引         # 这意味着从idx开始的所有uniq_lower元素都满足 lower >= u 的条件         idx = bisect_left(uniq_lower, u)          # 遍历所有满足条件的lower值         for lv in uniq_lower[idx:]:             if lv in last_seen:                 # 如果该lower值在之前出现过                 if max_date is None:                     max_date = last_seen[lv]                 elif last_seen[lv] > max_date:                     # 更新为更近的日期                     max_date = last_seen[lv]          results.append(max_date)          # 更新last_seen字典:记录当前l值对应的最新日期d         last_seen[l] = d      return results  # 应用优化后的函数 df['prev_bisect'] = get_prev_with_bisect(df["lower"], df["upper"], df.index)  print("nBisect方案结果:") print(df)

原理分析:

  1. 预处理 uniq_lower: 首先,我们从lower列中提取所有不重复的值,并将其排序存储在uniq_lower列表中。这个列表将用于二分查找。
  2. last_seen 字典: last_seen是一个哈希表(字典),用于存储每个lower值最近一次出现的DATE。当遍历DataFrame时,每处理完一行,就用当前行的lower值和DATE更新last_seen。
  3. 二分查找 (bisect_left): 对于当前行的upper值u,我们使用bisect_left(uniq_lower, u)来找到uniq_lower中第一个大于或等于u的元素的索引idx。这意味着uniq_lower[idx:]包含了所有可能满足lower >= u条件的lower值。
  4. 查找最新日期: 遍历uniq_lower[idx:]中的每一个lv(可能的lower值)。如果lv在last_seen字典中存在(表示这个lower值在之前出现过),就检查其对应的last_seen[lv]日期,并更新max_date为所有符合条件的lv中最大的日期。
  5. 更新 last_seen: 在处理完当前行并找到其prev值之后,将当前行的lower值l及其DATE d存入或更新last_seen字典。这确保了last_seen总是反映到当前行为止的最新状态。

时间复杂度分析:

Pandas高效查找历史条件匹配的最新索引:Bisect方法详解

纳米搜索

纳米搜索:360推出的新一代AI搜索引擎

Pandas高效查找历史条件匹配的最新索引:Bisect方法详解 30

查看详情 Pandas高效查找历史条件匹配的最新索引:Bisect方法详解

  • uniq_lower的创建和排序:O(N log N),其中N是DataFrame的行数。
  • 循环:N次迭代。
    • bisect_left:O(log M),其中M是uniq_lower中唯一值的数量。
    • 内部循环:最坏情况下遍历uniq_lower的一部分,最多O(M)次。
  • 总体时间复杂度接近O(N M) 或 O(N log M),但由于M通常远小于N,并且bisect_left的效率很高,实际性能远优于O(N^2)。在许多实际场景中,M可能是一个较小的常数,使得整个算法接近O(N log M)。

3. 性能对比与实践考量

为了更直观地展示不同方法的性能差异,我们使用一个包含10万行数据的DataFrame进行测试。

import pandas as pd import numpy as np from bisect import bisect_left import time  def get_sample_df(rows=100_000):     # Sample DataFrame     data = {'lower': np.random.default_rng(seed=1).uniform(1,100,rows),             'upper': np.random.default_rng(seed=2).uniform(1,100,rows)}      df = pd.DataFrame(data=data)     df = df.astype(int)      df['DATE'] = pd.date_range('2020-01-01', periods=len(data['lower']), freq="min")     df.set_index('DATE', inplace=True)     return df  # 基线方法 (get_baseline) - 与 get_most_recent_index_baseline 相同 def get_baseline():     df = get_sample_df()     def get_most_recent_index(row):         previous_indices = df.loc[:row.name - pd.Timedelta(minutes=1)]           recent_index = previous_indices[previous_indices['lower'] >= row['upper']].index.max()         return recent_index     df['prev'] = df.apply(get_most_recent_index, axis=1)      return df  # Bisect 方法 (get_bisect) - 与 get_prev_with_bisect 相同 def get_bisect():     df = get_sample_df()     df["prev"] = get_prev_with_bisect(df["lower"], df["upper"], df.index)     return df  # 朴素的enumerate循环方法 (get_enumerate) def get_enumerate():     df = get_sample_df()     df.reset_index(inplace=True) # 重置索引方便列表操作      date_list=df["DATE"].values.tolist()     lower_list=df["lower"].values.tolist()     upper_list=df["upper"].values.tolist()     new_list=[]     for i,(x,y) in enumerate(zip(lower_list,upper_list)):         if i==0:             new_list.append(None)         else:             found_date = None             # 从后向前遍历,找到第一个满足条件的日期             for ll,dl in zip(reversed(lower_list[0:i]),reversed(date_list[0:i])):                 if ll>=y:                     found_date = dl                     break             new_list.append(found_date)     df['prev']=new_list     df['prev']=pd.to_datetime(df['prev'])     return df  print("--- 性能测试 (100,000 行) ---")  start_time = time.time() get_baseline() print(f"Baseline (df.apply): {time.time() - start_time:.2f} seconds")  start_time = time.time() get_bisect() print(f"Bisect: {time.time() - start_time:.2f} seconds")  start_time = time.time() get_enumerate() print(f"Enumerate (Python loop): {time.time() - start_time:.2f} seconds")

预期性能结果(基于原始问题中的数据):

  • Baseline (df.apply): 约 1 分 35 秒
  • Bisect: 约 1.76 秒
  • Enumerate (Python loop): 约 1 分 13 秒

从结果可以看出,bisect方法在处理大规模数据时,性能远超df.apply和直接的Python循环(enumerate)。df.apply由于其内部开销和重复操作,效率最低。enumerate虽然是纯Python循环,但仍然需要进行O(N)的线性扫描,导致其时间复杂度依然是O(N^2)。

关于pyjanitor的说明:

原始问题中提到了pyjanitor库的一个尝试方案,但该方案在处理大规模数据时遇到了内存分配错误(”Unable to allocate 37.2 GiB for an Array…”)。这表明虽然pyjanitor提供了强大的条件连接功能,但对于某些特定场景,尤其是在需要创建大量中间数据结构时,可能会面临内存限制,不适合所有情况。

4. 注意事项与总结

  1. 状态依赖性: 本文讨论的问题具有“状态依赖性”,即当前行的计算结果依赖于历史数据。这类问题通常难以完全“向量化”(即一次性应用于整个数组的操作),因为向量化操作通常是无状态的。因此,寻找高效的迭代或半向量化方案是关键。
  2. bisect的适用性: bisect模块适用于需要在有序序列中进行查找的场景。在本例中,通过维护一个有序的uniq_lower列表和last_seen字典,成功地将线性查找转化为对数查找,从而大幅提升了效率。
  3. 内存管理: 在选择解决方案时,除了时间效率,内存使用也是一个重要考量。某些看起来“向量化”的库函数可能会在内部生成巨大的中间数据结构,导致内存溢出,如pyjanitor案例所示。
  4. 数据类型: 确保处理日期和时间时使用Pandas的timestamp或Python的datetime对象,以便正确进行比较和计算。

总结

在Pandas中处理依赖于历史状态的条件查找问题时,直接使用df.apply是效率最低的选择。通过巧妙地结合Python内置的bisect模块进行二分查找和哈希表(字典)来存储历史状态,我们可以构建出性能卓越的解决方案。这种方法将时间复杂度从O(N^2)显著降低,使其能够有效地处理大规模数据集。在实际开发中,理解问题的本质并选择合适的算法和数据结构是优化性能的关键。

上一篇
下一篇
text=ZqhQzanResources