Python 实现
我喜欢以干净简单的方式开始这些事情。因此,我只是编写了一个简单的类,它完全可以满足需要,而没有过多考虑优化。我称它为Interpolator,因为这在我看来像是分类插值。
class Interpolator:
def __init__(self, data):
self.data = data
self.current_idx = 0
self.current_nan_region_start = None
self.result = None
self.maxgap = 1
def run(self, maxgap=2):
# Initialization
self.result = [None] * len(self.data)
self.maxgap = maxgap
self.current_nan_region_start = None
prev_isnan = 0
for idx, item in enumerate(self.data):
isnan = item is None
self.current_idx = idx
if isnan:
if prev_isnan:
# Result is already filled with empty data.
# Do nothing.
continue
else:
self.entered_nan_region()
prev_isnan = 1
else: # not nan
if prev_isnan:
self.exited_nan_region()
prev_isnan = 0
else:
self.continuing_in_categorical_region()
def entered_nan_region(self):
self.current_nan_region_start = self.current_idx
def continuing_in_categorical_region(self):
self.result[self.current_idx] = self.data[self.current_idx]
def exited_nan_region(self):
nan_region_end = self.current_idx - 1
nan_region_length = nan_region_end - self.current_nan_region_start + 1
# Always copy the empty region endpoint even if gap is not filled
self.result[self.current_idx] = self.data[self.current_idx]
if nan_region_length > self.maxgap:
# Do not interpolate as exceeding maxgap
return
if self.current_nan_region_start == 0:
# Special case. data starts with "None"
# -> Cannot interpolate
return
if self.data[self.current_nan_region_start - 1] != self.data[self.current_idx]:
# Do not fill as both ends of missing data
# region do not have same value
return
# Fill the gap
for idx in range(self.current_nan_region_start, self.current_idx):
self.result[idx] = self.data[self.current_idx]
def interpolate(data, maxgap=2):
"""
Interpolate categorical variables over missing
values (None's).
Parameters
----------
data: list of objects
The data to interpolate. Holds
categorical data, such as 'cat', 'dog'
or 108. None is handled as missing data.
maxgap: int
The maximum gap to interpolate over.
For example, with maxgap=2, ['car', None,
None, 'car', None, None, None, 'car']
would become ['car', 'car', 'car' 'car',
None, None None, 'car'].
Note: Interpolation will only occur on missing
data regions where both ends contain the same value.
For example, [1, None, 2, None, 2] will become
[1, None, 2, 2, 2].
"""
interpolator = Interpolator(data)
interpolator.run(maxgap=maxgap)
return interpolator.result
这就是使用它的方式(下面是get_data() 的代码):
data = get_data(k=100)
interpolated_data = interpolate(data)
复制粘贴 Cython 实现
很可能 python 实现足够快,因为数组大小为 1000.000,在我的笔记本电脑上处理数据所需的时间为 0.504 秒。无论如何,creating Cython versions 很有趣,并且可能会提供额外的时间奖励。
所需步骤:
- 将python实现复制粘贴到新文件中,名为
fast_categorical_interpolate.pyx
- 在同一文件夹下创建
setup.py,内容如下:
from setuptools import setup
from Cython.Build import cythonize
setup(
ext_modules=cythonize(
"fast_categorical_interpolate.pyx",
language_level="3",
),
)
- 运行
python setup.py build_ext --inplace 以构建 Cython 扩展。您会在同一文件夹中看到类似 fast_categorical_interpolate.cp38-win_amd64.pyd 的内容。
- 现在,您可以像这样使用插值器:
import fast_categorical_interpolate as fpi
data = get_data(k=100)
interpolated_data = fpi.interpolate(data)
- 当然,您可以在 Cython 代码中进行一些优化以使其更快,但在我的机器上,当 N=1000.000 时速度提高了 38%,当 N=10.000 时提高了 126% .
我的机器上的计时
- 当 N=100(列表中的项目数)时,python 实现大约是 160x,Cython 实现比
smooth 快大约 250x
In [8]: timeit smooth(test_df, delay=2)
10.2 ms ± 669 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
In [9]: timeit interpolate(data)
64.8 µs ± 7.39 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
In [10]: timeit fpi.interpolate(data)
41.3 µs ± 4.64 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
- 当 N=10.000 时,时序差异约为 190x (Python) 到 302x (Cython)。
In [5]: timeit smooth(test_df, delay=2)
1.08 s ± 166 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [6]: timeit interpolate(data)
5.69 ms ± 852 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
In [7]: timeit fpi.interpolate(data)
3.57 ms ± 377 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
- 当 N=1000.000 时,python 实现大约快 210 倍,Cython 实现大约快 287 倍。
In [9]: timeit smooth(test_df, delay=2)
1min 45s ± 24.2 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [10]: timeit interpolate(data)
504 ms ± 67.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [11]: timeit fpi.interpolate(data)
365 ms ± 38 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
附录
测试数据创建者get_data()
import random
random.seed(0)
def get_data(k=100):
return random.choices(population=[None, "ZoneA", "ZoneB"], weights=[4, 3, 2], k=k)
用于测试的函数和测试数据smooth()
import pandas as pd
data = get_data(k=1000)
test_df = pd.DataFrame(dict(landlot=data)).fillna("None")
def smooth(df: pd.DataFrame, delay=2) -> pd.DataFrame:
"""
Args:
df (pd.DataFrame): dataframe with landlot column to smooth.
Returns:dataframe smoothed
"""
df_iter = df
last = "None"
last_index = 0
for num, line in df_iter.iterrows():
if (
(line.landlot != "None")
and (line.landlot == last)
and (num - last_index <= delay)
and (df_iter.iloc[(num - 1), df_iter.columns.get_loc("landlot")] == "None")
):
df_iter.iloc[
last_index : (num + 1), # noqa: E203
df_iter.columns.get_loc("landlot"),
] = last
if line.landlot != "None":
last = line.landlot
last_index = num
return df_iter
注意“当前代码”
我认为某处一定存在一些复制粘贴错误,因为“当前代码”不能正常工作。我用 delay=2 关键字参数替换了 self.delay 以指示最大间隙。我认为这是应该的。即使这样,逻辑也不能与您提供的简单示例数据正确工作。