【问题标题】:Speeding up iterative process with pandas dataframe使用 pandas 数据框加速迭代过程
【发布时间】:2017-09-17 19:05:09
【问题描述】:

我有一个大熊猫数据框df_gen包含 10000 个客户的时间序列数据。这些数据与能源使用有关。这是它的小版本

In[1]: df_gen   
Out[2]: 
                         10053802  10053856  10053898  10058054
2013-01-01 00:00:00     0.196     1.493     0.332     0.278
2013-01-01 00:30:00     0.155     1.497     0.336     0.275
2013-01-01 01:00:00     0.109     1.487       NaN     0.310
2013-01-01 01:30:00     0.703     1.479     0.331     0.272
2013-01-01 02:00:00     0.389     1.533     0.293     0.313

我有一个填充缺失数据的流程:对于特定客户 ID,它在特定时间戳有缺失数据,找到在整个数据集中具有最相似数据的时间戳,并使用它来填补空白。

使用这种方法的原因是能源使用取决于外部因素,例如外部温度,因此,例如在炎热的日子里,很多顾客都开着空调。如果我们找到大多数其他客户的能源使用与丢失数据点的日期和时间相似的日期和时间,那么这是填补缺失数据的好地方。

它使用一个函数通过计算每行的方差来识别数据与缺失数据的时间戳最匹配的时间戳:

def best_ts(df,ts_null,null_row):
# finds the timestamp for which the load is closest to the missing load at ts_null across the dataset df
# null_row is the row with the null data to be filled
var_df = pd.Series(index=df.index)
var_df.fillna(value=0, inplace=True)
if pd.isnull(null_row).all():
        logging.info('No customer data at all for %s ',str(ts_null))
var_df = ((df-null_row).fillna(value=0)**2).sum(axis=1)
smallest = var_df.idxmin()
return smallest

该脚本然后为每个客户和每个时间戳进行迭代,当它找到空数据时,它会调用 best_ts 并从该时间戳填充:

for id in df_gen.columns:
    for ts in df_gen.index:
        if pd.isnull(df_gen.loc[ts,id]):
        # slice df to remove rows that have no filling data for this customer and use this to fill from
        fill_ts = best_ts(df_gen[df_gen[id].notnull()],ts, df_gen.loc[ts])
        df_gen.loc[ts].fillna(df_gen.loc[fill_ts], inplace=True)

工作示例 使用上面的示例 df,当找到NaN 数据时,best_ts 被传递 3 个参数:删除缺失数据行的 df、缺失数据的时间戳以及作为 pandas Series

In: df_gen[df_gen[id].notnull()]
Out: 
                     10053802  10053856  10053898  10058054
2013-01-01 00:00:00     0.196     1.493     0.332     0.278
2013-01-01 00:30:00     0.155     1.497     0.336     0.275
2013-01-01 01:30:00     0.703     1.479     0.331     0.272
2013-01-01 02:00:00     0.389     1.533     0.293     0.313

In: ts
Out: 

datetime.datetime(2013, 1, 1, 1, 0)

In: df_gen.loc[ts]
Out: 
10053802    0.109
10053856    1.487
10053898      NaN
10058054    0.310

在函数中,使用与数据帧相同的 DateTimeIndex 创建了一个熊猫系列 var_df。每个值都是方差,即每个客户的能量值与时间戳ts 的能量值之间的平方差之和。

例如,var_df 中的第一个值由 ((0.196-0.109)^2 + (1.493-1.487)^2 + 0 + (0.278-0.310)^2) = 0.008629 给出

In: var_df
Out: 
2013-01-01 00:00:00    0.008629
2013-01-01 00:30:00    0.003441
2013-01-01 01:30:00    0.354344
2013-01-01 02:00:00    0.080525
dtype: float64

所以时间戳2013-01-01 00:30:00是最“喜欢”缺失数据的时间,所以选择这个来填补缺失的数据。

所以填充的数据框看起来像这样:

In: df_gen
Out: 
                     10053802  10053856  10053898  10058054
2013-01-01 00:00:00     0.196     1.493     0.332     0.278
2013-01-01 00:30:00     0.155     1.497     0.336     0.275
2013-01-01 01:00:00     0.109     1.487     0.336     0.310
2013-01-01 01:30:00     0.703     1.479     0.331     0.272
2013-01-01 02:00:00     0.389     1.533     0.293     0.313

(注意:在这个小例子中,“最佳”时间戳恰好是丢失数据之前的时间戳,但在完整数据集中,它可能是一年中 17519 个时间戳中的任何一个。)

这段代码有效,但是慢!通过数据集大约需要 2 个月!我希望通过避免嵌套迭代或加速函数来加快它的速度。

【问题讨论】:

  • 处理顺序重要吗?即我们需要在客户10006572之前做客户10006414吗?如果不是,您可以考虑使用多处理,否则使用 ctypes 可能会加快您的循环。
  • 您是否考虑过使用pandas.Dataframe.interpolate 而不是滚动您自己的插值算法?看来您正在尝试做 "nearest" 所做的事情,但我确信它会更有效地完成它
  • @jprockbelly - 不,顺序并不重要。对多处理一无所知,但 PC 有 16 个内核,所以绝对值得一探。
  • @juanpa.arrivillaga 我看过那个文档。我不认为“最近”做了我想做的事,尽管我不完全理解那里给出的不同方法。
  • 您没有提供足够的数据来证明问题。您也没有提供示例计算。期望您的代码记录您的想法通常不是一个好主意。请花时间指导我们进行计算。并向我们​​展示填补缺失的样子。

标签: python performance pandas dataframe iteration


【解决方案1】:

看起来您的相似性指标正在计算每列之间的元素平方距离之和。一种方法,诚然有点笨拙(但利用了快速的 Pandas 操作),是:

  1. 遍历每一列,并创建一个与原始数据框具有相同维度的新数据框,但其中每一列都是当前列的副本。
  2. 使用df.subtract().pow(2).sum() 计算相似度,忽略减去自身的列,找到最小值的列名称(即客户ID)。
  3. 用匹配列中的对应值更新当前列中的缺失值。

下面是一个粗略的草案,但它可能足以适应您的用例。此实现的一个重要假设是每个客户只能丢失一个数据点。该代码应该可以推广到每个客户的多个缺失数据点,只需做一些工作。因此,在测试此代码时,请确保随机生成的df 每列只有一个缺失数据点。 (通常会,但并非总是如此。)

生成样本数据

dates = pd.date_range('20170101', periods=10, freq='D')
ids = [10006414, 10006572, 10006630, 10006664, 10006674]
values = np.random.random(size=len(dates)*len(ids)).reshape(10,5)
df = pd.DataFrame(values, index=dates, columns=ids)

# insert random missing data
nan_size = 4
for _ in range(nan_size):
    nan_row = np.random.randint(0, df.shape[0])
    nan_col = np.random.randint(0, df.shape[1])
    df.iloc[nan_row, nan_col] = np.nan

执行匹配插值

def get_closest(customer, dims):
    cust = customer.name
    nrow = dims[0]
    ncol = dims[1]
    replace_row = df.index[df[cust].isnull()]
    # make data frame full of cust data
    df2 = pd.DataFrame(np.repeat(df.loc[:,cust], ncol).values.reshape(nrow,ncol), 
                       index=dates, columns=ids)
    replace_col = (df.subtract(df2)
                     .pow(2)
                     .sum()
                     .replace({0:np.nan}) # otherwise 0 will go to top of sort
                     .sort_values()
                     .index[0] # index here is matching customer id
                  )
    customer[replace_row] = df.ix[replace_row, replace_col]
    return customer

print(df.apply(get_closest, axis='rows', args=(df.shape,)))

更新
根据 OP 的说明,目标是进行逐行比较(即查找最相似的时间戳)而不是逐列比较(即查找最相似的客户)。下面是get_closest() 的更新版本,它可以进行逐行比较,并且可以顺利处理多个缺失值。

我还添加了一个报告功能,它将打印包含所有客户缺失条目的每个时间戳,以及用于估算缺失值的时间戳。报告默认关闭,只需在apply() 中传入True 作为第二个args 条目即可打开它。

更新 2
更新后的逐行 get_closest() 现在考虑了最近的时间戳也具有需要插补的客户列的 NaN 值的边缘情况。现在,该函数将搜索最近的时间戳,具有需要估算的缺失值的可用数据。

样本数据:

            10006414  10006572  10006630  10006664  10006674
2017-01-01  0.374593  0.982585  0.059732  0.513149  0.251808
2017-01-02  0.269229  0.998531  0.523589  0.780806  0.033106
2017-01-03  0.261173  0.828637  0.638376  0.314944  0.737646
2017-01-04  0.786112  0.101750  0.286983  0.242778  0.341717
2017-01-05  0.230358  0.387392  0.918353  0.206100       NaN
2017-01-06  0.715966  0.206121  0.153461  0.894511  0.765227
2017-01-07  0.095002  0.169697  0.465624  0.109404  0.212315
2017-01-08  0.474712       NaN  0.471861  0.773374  0.454295
2017-01-09       NaN  0.201928  0.228018  0.173968  0.248485
2017-01-10  0.542635       NaN  0.132974  0.692073  0.201721

ROW-WISE get_closest()

def get_closest(row, dims, report=False):
    if row.isnull().sum():
        ts_with_nan = row.name
        nrow, ncol = dims
        df2 = pd.DataFrame(np.tile(df.loc[ts_with_nan], nrow).reshape(nrow,ncol), 
                           index=df.index, columns=df.columns)
        most_similar_ts = (df.subtract(df2, axis='rows', fill_value=0)
                             .pow(2)
                             .sum(axis=1, skipna=True)
                             .sort_values()
                          )
        # remove current row from matched indices
        most_similar_ts = most_similar_ts[most_similar_ts.index != ts_with_nan] 
        # narrow down to only columns where replacements would occur
        match_vals = df.ix[most_similar_ts.index, df.loc[ts_with_nan].isnull()]
        # select only rows where all values are non-empty
        all_valid = match_vals.notnull().all(axis=1)
        # take the timestamp index of the first row of match_vals[all_valid]
        best_match = match_vals[all_valid].head(1).index[0]
        if report:
            print('MISSING VALUES found at timestamp: {}'.format(ts_with_nan.strftime('%Y-%m-%d %H:%M:%S')))
            print('            REPLACEMENT timestamp: {}'.format(best_match.strftime('%Y-%m-%d %H:%M:%S')))

        # replace missing values with matched data
        return row.fillna(df.loc[best_match])

    return row

df.apply(get_closest, axis='columns', args=(df.shape, True)) # report=True

输出:

# MISSING VALUES found at timestamp: 2017-01-02 00:00:00
            # REPLACEMENT timestamp: 2017-01-09 00:00:00
# MISSING VALUES found at timestamp: 2017-01-07 00:00:00
            # REPLACEMENT timestamp: 2017-01-10 00:00:00
# MISSING VALUES found at timestamp: 2017-01-09 00:00:00
            # REPLACEMENT timestamp: 2017-01-03 00:00:00

print(df)
            10006414  10006572  10006630  10006664  10006674
2017-01-01  0.374593  0.982585  0.059732  0.513149  0.251808
2017-01-02  0.269229  0.998531  0.523589  0.780806  0.033106
2017-01-03  0.261173  0.828637  0.638376  0.314944  0.737646
2017-01-04  0.786112  0.101750  0.286983  0.242778  0.341717
2017-01-05  0.230358  0.387392  0.918353  0.206100  0.212315
2017-01-06  0.715966  0.206121  0.153461  0.894511  0.765227
2017-01-07  0.095002  0.169697  0.465624  0.109404  0.212315
2017-01-08  0.474712  0.201928  0.471861  0.773374  0.454295
2017-01-09  0.095002  0.201928  0.228018  0.173968  0.248485
2017-01-10  0.542635  0.201928  0.132974  0.692073  0.201721

除了这种逐行方法之外,我还在此答案的开头保留了 get_closest() 的原始版本,因为我可以看到基于“最近的客户”而不是“最近的时间戳”的估算价值,它可能在将来作为其他人的参考点有用。

更新 3
OP 提供了这个更新和最终确定的解决方案:

import pandas as pd
import numpy as np

# create dataframe of random data
dates = pd.date_range('20170101', periods=10, freq='D')
ids = [10006414, 10006572, 10006630, 10006664, 10006674]
values = np.random.random(size=len(dates)*len(ids)).reshape(10,5)
df = pd.DataFrame(values, index=dates, columns=ids)

# insert random missing data
nan_size = 20
for _ in range(nan_size):
    nan_row = np.random.randint(0, df.shape[0])
    nan_col = np.random.randint(0, df.shape[1])
    df.iloc[nan_row, nan_col] = np.nan

print ('Original df is ', df)
def get_closest(row, dims, report=False):
    if row.isnull().sum():
        ts_with_nan = row.name
        nrow, ncol = dims
        df2 = pd.DataFrame(np.tile(df.loc[ts_with_nan], nrow).reshape(nrow, ncol), index=df.index, columns=df.columns)
        most_similar_ts = (df.subtract(df2, axis='rows')
                           .pow(2)
                           .sum(axis=1, skipna=True)
                           .sort_values())
        # remove current row from matched indices
        most_similar_ts = most_similar_ts[most_similar_ts.index != ts_with_nan]
        if report:
            print('MISSING VALUES found at timestamp: {}'.format(ts_with_nan.strftime('%Y-%m-%d %H:%M:%S')))
        while row.isnull().sum():
            # narrow down to only columns where replacements would occur
            match_vals = df.ix[most_similar_ts.index, df.loc[ts_with_nan].isnull()]
            # fill from closest ts
            best_match = match_vals.head(1).index[0]
            row = row.fillna(df.loc[best_match])

            if report:
                print('            REPLACEMENT timestamp: {}'.format(best_match.strftime('%Y-%m-%d %H:%M:%S')))
            # Any customers with remaining NaNs in df.loc[ts_with_nan] also have NaNs in df.loc[best_match]
            # so remove this ts from the results and repeat the process
            most_similar_ts = most_similar_ts[most_similar_ts.index != best_match]
        return row


    return row

df_new = df.apply(get_closest, axis='columns', args=(df.shape, True))  # report=True
print ('Final df is ', df_new)

【讨论】:

  • 谢谢@andrew_reece 这很有帮助,但不是我想要的——我正在使用不同的行(时间戳)从同一列(客户 ID)更新,因此需要交换维度。它还需要为每个客户和每个时间戳处理多个 Null,这可能需要迭代,但只能通过数据的子集。
  • 好的,我想我现在明白了。查看您的示例案例,您希望相对于包含缺失值的另一行(称为此行row_missing),用最小的平方和(称为row_SSE)来细化时间戳(行)。然后将row_missing 中的每个NaN 替换为与NaN 在同一列中的row_SSE 中的值。每个时间戳可能发生 0 次或更多次。这种方法并不能解决客户在row_missingrow_SEE 中都缺少数据的问题,但是您现在愿意接受这些边缘情况。这一切都正确吗?
  • 谢谢。您对问题的描述是正确的,除了我需要处理那些边缘情况。这就是为什么我最初对客户进行迭代,然后对时间戳进行迭代,以允许每个客户使用不同的row_SSE。但是最好接受这些边缘情况,然后通过遍历仍然缺少数据的客户并为每个客户调用get_closest(),但在删除该客户的情况下使用df,进行第二次扫描以获取它们。
  • ...但这不能正常工作。 In: df2Out: 10006414 10006572 10006630 10006664 10006674 2017-01-01 NaN NaN NaN NaN NaN 2017-01-02 NaN NaN NaN NaN NaN 2017-01-03 0.296268 0.296268 0.296268 0.296268 0.296268 2017-01-04 0.296268 0.296268 0.296268 0.296268 0.296268 2017-01-05 0.184410 0.184410 0.184410 0.184410 0.184410 2017-01-06 0.184410 0.184410 0.184410 0.184410 0.184410 2017-01-07 0.144101 0.144101 0.144101 0.144101 0.144101 ....
  • 这条线 df2 = pd.DataFrame(np.repeat(df.loc[ts_with_nan], nrow).values.reshape(nrow, ncol), index=df.index, columns=df.columns) 不适合我(在 py 3.6 中)
【解决方案2】:

很抱歉花了整个周末才回复您,但这里有一个如何将其转换为线程进程的示例。

首先,您需要将循环转换为接受 2 个参数的函数。这是我的版本,注意它现在接受id_ts 的元组,(我避免使用id,因为它是现有的python 函数)

def my_func(item): #takes a tuple of id and ts 
    id_, ts = item
    if pd.isnull(df_gen.loc[ts,id_]):
        # slice df to remove rows that have no filling data for this customer and use this to fill from
        fill_ts = best_ts(df_gen[df_gen[id_].notnull()],ts, df_gen.loc[ts])
        df_gen.loc[ts].fillna(df_gen.loc[fill_ts], inplace=True)

我们还需要设置一些进程,为这个函数提供我们想要检查的id_ts 的所有组合。我们可以使用非常方便的itertools 库来简化此操作:

from itertools import product
product(df_gen.columns, df_gen.index)

(即使您不想使用线程,您仍然可以使用它来减少嵌套 for 循环)

现在我们有了函数和输入,我们可以并行化它! bottom of the docs for queue 给出了一个很好的例子来说明如何设置它。所以借用那个例子:

import threading
from itertools import product
from queue import Queue

def worker():
    while True:
        item = q.get() #get the next item in the queue
        if item is None:
            break
        my_func(item) #send item to your function here
        q.task_done() #remove from queue once done

q = Queue() #create a queue object
threads = []
num_worker_threads = 8 #pick a number that works for you, I suggest trying a few between 4 and 200

#create a list of threads
for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

#create a queue of items
#this example is ok for a relativley small dataframe
#for your actual big dataframe you way want to do this in chucks
for item in product(df_gen.columns, df_gen.index): 
    q.put(item) #put items in my queue

# block until all tasks are done
q.join()

我建议从您的数据子集开始并测试几个不同的工人数量。很多并不总是更好,这取决于正在运行的代码和用于运行它的硬件。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-10-01
    • 1970-01-01
    • 2015-08-28
    • 2017-06-15
    • 2017-01-04
    • 2019-01-14
    • 1970-01-01
    相关资源
    最近更新 更多