【问题标题】:How could I attach a prompt at the end of trips如何在旅行结束时附加提示
【发布时间】:2018-08-29 20:50:27
【问题描述】:

旅行

id,timestamp
1008,2003-11-03 15:00:31
1008,2003-11-03 15:02:38
1008,2003-11-03 15:03:04
1008,2003-11-03 15:18:00
1009,2003-11-03 22:00:00
1009,2003-11-03 22:02:53
1009,2003-11-03 22:03:44 
1009,2003-11-14 10:00:00
1009,2003-11-14 10:02:02
1009,2003-11-14 10:03:10

提示

id,timestamp ,mode
1008,2003-11-03 15:18:49,car 
1009,2003-11-03 22:04:20,metro
1009,2003-11-14 10:04:20,bike 

读取 csv 文件:

coordinates = pd.read_csv('coordinates.csv')
mode = pd.read_csv('prompts.csv')

我必须在旅行结束时分配每种模式

结果:

id, timestamp, mode
1008, 2003-11-03 15:00:31, null
1008, 2003-11-03 15:02:38, null
1008, 2003-11-03 15:03:04, null
1008, 2003-11-03 15:18:00, car
1009, 2003-11-03 22:00:00, null
1009, 2003-11-03 22:02:53, null
1009, 2003-11-03 22:03:44, metro
1009, 2003-11-14 10:00:00, null
1009, 2003-11-14 10:02:02, null
1009, 2003-11-14 10:03:10, bike 

注意

我使用大型数据集(4GB)和小型数据集(500MB)

【问题讨论】:

  • 是否只需要在行程结束时?否则你可以使用coordinates.merge(mode, on='id')。这将为具有指定 id 的所有行填写运输方式。
  • @tobsecret 我只想指定轨迹末端的模式,根本不指定。当我合并我的程序需要几个时间(5 小时)
  • 为什么这个标签是 pyspark?
  • @pault 我使用 Pyspark 和 Jupyter。笔记本
  • @adilblanco 但所有这些代码都是熊猫。你是在问如何在 spark 中做到这一点?

标签: python python-3.x pandas pyspark


【解决方案1】:

根据您更新的示例,您可以通过查找大于行程时间戳的第一个提示时间戳来表示行程。具有相同提示时间戳的所有行将对应于相同的行程。然后你想为每个组设置最大的行程时间戳的模式。

一种方法是使用 2 pyspark.sql.Windows。

假设您从以下两个 PySpark 数据帧开始,tripsprompts

trips.show(truncate=False)
#+----+-------------------+
#|id  |timestamp          |
#+----+-------------------+
#|1008|2003-11-03 15:00:31|
#|1008|2003-11-03 15:02:38|
#|1008|2003-11-03 15:03:04|
#|1008|2003-11-03 15:18:00|
#|1009|2003-11-03 22:00:00|
#|1009|2003-11-03 22:02:53|
#|1009|2003-11-03 22:03:44|
#|1009|2003-11-14 10:00:00|
#|1009|2003-11-14 10:02:02|
#|1009|2003-11-14 10:03:10|
#|1009|2003-11-15 10:00:00|
#+----+-------------------+

prompts.show(truncate=False)
#+----+-------------------+-----+
#|id  |timestamp          |mode |
#+----+-------------------+-----+
#|1008|2003-11-03 15:18:49|car  |
#|1009|2003-11-03 22:04:20|metro|
#|1009|2003-11-14 10:04:20|bike |
#+----+-------------------+-----+

使用id 列将这两个表连接在一起,条件是提示时间戳大于或等于行程时间戳。对于某些行程时间戳,这将导致多个提示时间戳。我们可以通过为每个('id', 'trip.timestamp') 组选择最小提示时间戳来消除这种情况——我将此临时列称为indicator,并使用窗口w1 来计算它。

接下来在('id', 'indicator') 上做一个窗口,并找到每个组的最大行程时间戳。将此值设置为等于mode。所有其他行将设置为pyspark.sql.functions.lit(None)

最后,您可以计算trips 中行程时间戳大于最大提示时间戳的所有条目。这些将是与提示不匹配的旅行。将匹配的和不匹配的结合在一起。

import pyspark.sql.functions as f
from pyspark.sql import Window

w1 = Window.partitionBy('id', 'trips.timestamp')
w2 = Window.partitionBy('id', 'indicator')

matched = trips.alias('trips').join(prompts.alias('prompts'), on='id', how='left')\
    .where('prompts.timestamp >= trips.timestamp' )\
    .select(
        'id',
        'trips.timestamp',
        'mode',
        f.when(
            f.col('prompts.timestamp') == f.min('prompts.timestamp').over(w1),
            f.col('prompts.timestamp'),
        ).otherwise(f.lit(None)).alias('indicator')
    )\
    .where(~f.isnull('indicator'))\
    .select(
        'id',
        f.col('trips.timestamp').alias('timestamp'),
        f.when(
            f.col('trips.timestamp') == f.max(f.col('trips.timestamp')).over(w2),
            f.col('mode')
        ).otherwise(f.lit(None)).alias('mode')
    )

unmatched = trips.alias('t').join(prompts.alias('p'), on='id', how='left')\
    .withColumn('max_prompt_time', f.max('p.timestamp').over(Window.partitionBy('id')))\
    .where('t.timestamp > max_prompt_time')\
    .select('id', 't.timestamp', f.lit(None).alias('mode'))\
    .distinct()

输出:

matched.union(unmatched).sort('id', 'timestamp').show()

+----+-------------------+-----+
|  id|          timestamp| mode|
+----+-------------------+-----+
|1008|2003-11-03 15:00:31| null|
|1008|2003-11-03 15:02:38| null|
|1008|2003-11-03 15:03:04| null|
|1008|2003-11-03 15:18:00|  car|
|1009|2003-11-03 22:00:00| null|
|1009|2003-11-03 22:02:53| null|
|1009|2003-11-03 22:03:44|metro|
|1009|2003-11-14 10:00:00| null|
|1009|2003-11-14 10:02:02| null|
|1009|2003-11-14 10:03:10| bike|
|1009|2003-11-15 10:00:00| null|
+----+-------------------+-----+

【讨论】:

  • 感谢您的回答,它对我帮助很大,但我注意到它不适用于有多个提示的用户。我将使用第二个提示更新用户的数据。
  • 我在 Trips 中添加了最后 3 行,在 Prompts 中添加了最后一行
  • 工作正常,谢谢。
  • 在用户没有提示的情况下。例如,如果我们消除最后一行提示。结果将被伪造。我将有 7 行而不是 10 行
  • 我尝试了所有连接,但仍然是 7 行而不是 10 行。对于投票,它是用另一个帐户完成的,因为我使用的这个帐户我没有这样做的声誉。再次感谢您的帮助。
【解决方案2】:

这将是一个幼稚的解决方案,它假设您的坐标 DataFrame 已经按时间戳排序,ID 是唯一的,并且您的数据集适合内存。如果不是后者,我建议使用 dask 并按 id 对 DataFrame 进行分区。

进口:

import pandas as pd
import numpy as np

首先我们加入两个 DataFrame。这将填充每个 id 的整个模式列。我们加入索引是因为这将加快操作速度,另请参见“Improve Pandas Merge performance”。

mode = mode.set_index('id')
coordinates = coordinates.set_index('id')
merged = coordinates.join(mode, how='left')

我们需要索引是唯一值才能使我们的 groupby 操作正常工作。

merged = merged.reset_index()

然后我们应用一个函数,该函数将替换每个 id 模式列中除最后一行之外的所有内容。

def clean_mode_col(df):
    cleaned_mode_col = df['mode'].copy()
    cleaned_mode_col.iloc[:-1] = np.nan
    df['mode'] = cleaned_mode_col
    return df
merged  = merged.groupby('id').apply(clean_mode_col)

如上所述,您可以使用 dask 来并行执行合并代码,如下所示:

import dask.dataframe as dd
dd_coordinates = dd.from_pandas(coordinates).set_index('id')
dd_mode = dd.from_pandas(mode).set_index('id')
merged = dd.merge(dd_coordinates, dd_mode, left_index=True, right_index=True)
merged = merged.compute() #returns pandas DataFrame

set_index 操作很慢,但使合并方式更快。

我没有测试这段代码。请提供包含您的 DataFrames 的可复制粘贴代码,这样我就不必复制和粘贴您在描述中的所有文件(提示:使用 pd.DataFrame.to_dict 将您的 DataFrame 导出为字典并复制和粘贴进入你的代码)。

【讨论】:

  • 感谢您的回答,我的问题在合并中。在我的情况下,合并需要很多时间来执行(平均:5小时)然后我想优化它。我试过你的代码在一个小数据集上工作得很好。
  • 是的,这可能是因为您的桌子很大。您可以尝试使 id 列成为两个 DataFrame 中的索引 (coordinates = coordinate.set_index('id'), mode = mode.set_index('id)) ,然后像这样合并:merged = coordinates.merge(mode , right_index=True, left_index=True) 再次,我鼓励您为此使用 dask,因为它可以让您并行化此过程,并且基本上是相同的。
  • 谢谢,但我不能用几个提示来对待用户!
  • 请查看更新后的答案 - 加入索引应该会大大加快进程。至于 Dask 部分 - 它仅用于合并部分,但只要您的数据适合内存,您可能希望像以前一样使用 pandas 执行 groupby apply pandas。
猜你喜欢
  • 2018-05-04
  • 2011-02-07
  • 2023-03-25
  • 2017-01-20
  • 2013-09-19
  • 2021-01-20
  • 1970-01-01
  • 1970-01-01
  • 2012-07-04
相关资源
最近更新 更多