【问题标题】:Joining two spark dataframes on time (TimestampType) in python在python中按时加入两个火花数据帧(TimestampType)
【发布时间】:2015-08-18 06:21:57
【问题描述】:

我有两个数据框,我想基于一列加入它们,但需要注意的是,该列是一个时间戳,并且该时间戳必须在某个偏移量(5 秒)内才能加入记录。更具体地说,dates_dfdate=1/3/2015:00:00:00 中的记录应该与 events_dftime=1/3/2015:00:00:01 连接,因为两个时间戳之间的时间差在 5 秒内。

我试图让这个逻辑与 python spark 一起工作,这非常痛苦。人们如何在 spark 中进行这样的连接?

我的方法是向dates_df 添加两个额外的列,这将确定lower_timestampupper_timestamp 的边界,偏移量为5 秒,并执行条件连接。这就是它失败的地方,更具体地说:

joined_df = dates_df.join(events_df, 
    dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)

joined_df.explain()

仅捕获查询的最后部分:

Filter (time#6 < upper_timestamp#4)
 CartesianProduct
 ....

它给了我一个错误的结果。

我真的必须为每个不等式做一个完整的笛卡尔连接,同时删除重复项吗?

这里是完整的代码:

from datetime import datetime, timedelta

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf


master = 'local[*]'
app_name = 'stackoverflow_join'

conf = SparkConf().setAppName(app_name).setMaster(master)
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

def lower_range_func(x, offset=5):
    return x - timedelta(seconds=offset)

def upper_range_func(x, offset=5):
    return x + timedelta(seconds=offset)


lower_range = udf(lower_range_func, TimestampType())
upper_range = udf(upper_range_func, TimestampType())

dates_fields = [StructField("name", StringType(), True), StructField("date", TimestampType(), True)]
dates_schema = StructType(dates_fields)

dates = [('day_%s' % x, datetime(year=2015, day=x, month=1)) for x in range(1,5)]
dates_df = sqlContext.createDataFrame(dates, dates_schema)

dates_df.show()

# extend dates_df with time ranges
dates_df = dates_df.withColumn('lower_timestamp', lower_range(dates_df['date'])).\
           withColumn('upper_timestamp', upper_range(dates_df['date']))


event_fields = [StructField("time", TimestampType(), True), StructField("event", StringType(), True)]
event_schema = StructType(event_fields)

events = [(datetime(year=2015, day=3, month=1, second=3), 'meeting')]
events_df = sqlContext.createDataFrame(events, event_schema)

events_df.show()

# finally, join the data
joined_df = dates_df.join(events_df, 
    dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)    

joined_df.show()

我得到以下输出:

+-----+--------------------+
| name|                date|
+-----+--------------------+
|day_1|2015-01-01 00:00:...|
|day_2|2015-01-02 00:00:...|
|day_3|2015-01-03 00:00:...|
|day_4|2015-01-04 00:00:...|
+-----+--------------------+

+--------------------+-------+
|                time|  event|
+--------------------+-------+
|2015-01-03 00:00:...|meeting|
+--------------------+-------+


+-----+--------------------+--------------------+--------------------+--------------------+-------+
| name|                date|     lower_timestamp|     upper_timestamp|                time|  event|
+-----+--------------------+--------------------+--------------------+--------------------+-------+
|day_3|2015-01-03 00:00:...|2015-01-02 23:59:...|2015-01-03 00:00:...|2015-01-03 00:00:...|meeting|
|day_4|2015-01-04 00:00:...|2015-01-03 23:59:...|2015-01-04 00:00:...|2015-01-03 00:00:...|meeting|
+-----+--------------------+--------------------+--------------------+--------------------+-------+

【问题讨论】:

  • Spark SQL 似乎可以优雅地处理它。 results = sqlContext.sql("SELECT * FROM dates INNER JOIN events ON dates.lower_timestamp &lt; events.time and events.time &lt; dates.upper_timestamp") 成功了。
  • 只是一个想法:将这个 dates_df.lower_timestamp
  • @ayan:这个方法我也试过了,还是不行。
  • 看起来像一个错误......
  • 我不懂 Python,但这在 Scala 中应该很简单。您甚至不需要创建新列。我将创建一个 UDF,它可以在时间戳中添加或减去秒数,然后重新运行。然后在两个 UDF 调用结果之间的一个时间戳进行连接。

标签: join apache-spark apache-spark-sql pyspark


【解决方案1】:

我确实使用explain() 触发 SQL 查询以查看它是如何完成的,并在 python 中复制了相同的行为。首先是如何用 SQL spark 做同样的事情:

dates_df.registerTempTable("dates")
events_df.registerTempTable("events")
results = sqlContext.sql("SELECT * FROM dates INNER JOIN events ON dates.lower_timestamp < events.time and  events.time < dates.upper_timestamp")
results.explain()

这行得通,但问题是关于如何在python中做到这一点,所以解决方案似乎只是一个简单的连接,然后是两个过滤器:

joined_df = dates_df.join(events_df).filter(dates_df.lower_timestamp < events_df.time).filter(events_df.time < dates_df.upper_timestamp)

joined_df.explain() 产生与 sql spark results.explain() 相同的查询,所以我假设事情就是这样完成的。

【讨论】:

    【解决方案2】:

    虽然一年后,但可能会帮助其他人..

    正如您所说,在您的情况下,完整的笛卡尔积是疯狂的。您的匹配记录将及时关闭(5 分钟),因此如果您首先根据时间戳将记录分组到存储桶中,然后加入该存储桶上的两个数据帧,然后才应用,则可以利用这一点并节省大量时间过滤器。使用该方法会导致 Spark 使用 SortMergeJoin 而不是 CartesianProduct,从而大大提高性能。

    这里有一个小警告 - 您必须同时匹配存储桶和下一个存储桶。

    最好在我的博客中用工作代码示例进行解释(Scala + Spark 2.0,但您也可以在 python 中实现相同...)

    http://zachmoshe.com/2016/09/26/efficient-range-joins-with-spark.html

    【讨论】:

    • 不要链接到随机的博客文章(即使是你的),在这里写下例子,然后链接到你的博客。
    猜你喜欢
    • 2016-07-22
    • 2020-01-21
    • 1970-01-01
    • 2020-08-04
    • 1970-01-01
    • 1970-01-01
    • 2023-03-13
    • 1970-01-01
    • 2019-07-07
    相关资源
    最近更新 更多