【问题标题】:What does the 'pyspark.sql.functions.window' function's 'startTime' argument do?'pyspark.sql.functions.window' 函数的 'startTime' 参数有什么作用?
【发布时间】:2017-01-09 05:18:58
【问题描述】:

在官方文档中只有一个简单的例子:

startTime 是相对于 1970-01-01 00:00:00 UTC 的偏移量 从哪个开始 窗口间隔。例如,为了让每小时滚动的窗口开始 15 分钟 过了一小时,例如12:15-13:15, 13:15-14:15... 提供startTime15 minutes

但我想知道它如何处理所有参数。

例如:

ts_list = map(lambda x: datetime.datetime(2017, 1, 9, 9, 0, 10) + datetime.timedelta(seconds=x), range(30))
rdd = spark.sparkContext.parallelize(ts_list).map(lambda x: (x, 1))
df = spark.createDataFrame(rdd, schema=['dt', 'val'])
win = df.groupBy(window("dt", "5 seconds", '4 seconds', '3 seconds')).agg(sum("val").alias("sum"))
pprint.pprint(win.select(win['window']['start'].cast('string').alias('start'),
                         win['window']['end'].cast('string').alias('end')).collect())

输出:

[Row(start=u'2017-01-09 09:00:19', end=u'2017-01-09 09:00:24'),                 
 Row(start=u'2017-01-09 09:00:35', end=u'2017-01-09 09:00:40'),
 Row(start=u'2017-01-09 09:00:27', end=u'2017-01-09 09:00:32'),
 Row(start=u'2017-01-09 09:00:07', end=u'2017-01-09 09:00:12'),
 Row(start=u'2017-01-09 09:00:31', end=u'2017-01-09 09:00:36'),
 Row(start=u'2017-01-09 09:00:39', end=u'2017-01-09 09:00:44'),
 Row(start=u'2017-01-09 09:00:11', end=u'2017-01-09 09:00:16'),
 Row(start=u'2017-01-09 09:00:23', end=u'2017-01-09 09:00:28'),
 Row(start=u'2017-01-09 09:00:15', end=u'2017-01-09 09:00:20')]

那为什么?

【问题讨论】:

    标签: apache-spark dataframe pyspark apache-spark-sql


    【解决方案1】:

    它与您的数据何时开始无关。当然,第一个窗口只会出现,直到您在该窗口中有一些数据。但是 startTime 与您的数据无关。 正如文档所说,startTime 是相对于 1970-01-01 19:00:00 UTC 开始窗口间隔的偏移量。 如果您创建这样的窗口:
    w = F.window("date_field", "7 days", startTime='6 days')

    spark 将生成从 1970-01-06 开始的 7 天的窗口:

    1970-01-06 19:00:00, 1970-01-13 19:00:00
    1970-01-13 19:00:00, 1970-01-20 19:00:00
    1970-01-20 19:00:00, 1970-01-27 19:00:00
    ...
    2017-05-16 19:00:00, 2017-05-23 19:00:00
    (如果你继续计算你会到达这个日期) ...
    但是您只会看到与数据框日期相关的窗口。 19:00:00 是因为我的时区是 -05
    如果你像这样创建一个窗口:

    w = F.window("date_field", "7 days", startTime='2 days')

    spark 将生成从 1970-01-02 开始的 7 天的窗口:

    1970-01-02 19:00:00, 1970-01-09 19:00:00
    1970-01-09 19:00:00, 1970-01-16 19:00:00
    ...
    2017-05-19 19:00:00, 2017-05-26 19:00:00
    (如果你继续计算你会到达这个日期)
    ...

    同样,您只会看到与数据框日期相关的窗口。

    那么,如何计算数据窗口的开始日期?
    您只需要计算自 1970 年 1 月 1 日以来开始日期的天数,然后将其除以窗口长度并取余数。这将是偏移天数的开始时间。


    我将用一个例子来解释它: 假设您需要您的窗口从 2017-05-21 开始,并且 窗口的长度是 7 天。我将为示例创建一个虚拟数据框。

    row = Row("id", "date_field", "value")
    df = sc.parallelize([
    row(1, "2017-05-23", 5.0),
    row(1, "2017-05-26", 10.0),
    row(1, "2017-05-29", 4.0),
    row(1, "2017-06-10", 3.0),]).toDF()
    
    start_date = datetime(2017, 5, 21, 19, 0, 0) # 19:00:00 because my 
    timezone 
    days_since_1970_to_start_date =int(time.mktime(start_date.timetuple())/86400)
    offset_days = days_since_1970_to_start_date % 7
    
    w = F.window("date_field", "7 days", startTime='{} days'.format(
                                            offset_days))
    
    df.groupby("id", w).agg(F.sum("value")).orderBy("window.start").show(10, False)
    

    你会得到:

    +---+------------------------------------------+----------+
    |id |window                                    |sum(value)|
    +---+------------------------------------------+----------+
    |1  |[2017-05-21 19:00:00, 2017-05-28 19:00:00]|15.0      |
    |1  |[2017-05-28 19:00:00, 2017-06-04 19:00:00]|4.0       |
    |1  |[2017-06-04 19:00:00, 2017-06-11 19:00:00]|3.0       |
    +---+------------------------------------------+----------+
    

    【讨论】:

    • 你有w = F.windowF的导入是什么?
    • 从 pyspark.sql 导入函数为 F
    【解决方案2】:

    让我们一步一步来。

    • 您的数据开始于2017-01-09 09:00:10:

      df.orderBy("dt").show(3, False)
      
      +---------------------+---+
      |dt                   |val|
      +---------------------+---+
      |2017-01-09 09:00:10.0|1  |
      |2017-01-09 09:00:11.0|1  |
      |2017-01-09 09:00:12.0|1  |
      +---------------------+---+
      
    • 第一个完整小时是2017-01-09 09:00:00.0:

      from pyspark.sql.functions import min as min_, date_format
      (df
         .groupBy()
         .agg(date_format(min_("dt"), "yyyy-MM-dd HH:00:00"))
         .show(1, False))
      
      +-----------------------------------------+
      |date_format(min(dt), yyyy-MM-dd HH:00:00)|
      +-----------------------------------------+
      |2017-01-09 09:00:00                      |
      +-----------------------------------------+
      
    • 因此,第一个窗口将从2017-01-09 09:03:00 开始,即2017-01-09 09:00:00 + startTime(3 秒)并在2017-01-09 09:08:00 结束(2017-01-09 09:00:00 + startTime + windowDuration)。

      此窗口为空([09:03:00, 09:08:00) 范围内没有数据。

    • 第一个(和第二个)数据点将落入下一个窗口,即 [09:00:07.0, 09:00:12.0),它从 2017-01-09 09:00:00 + startTime + 1 * slideDuration 开始。

      win.orderBy("window.start").show(3, False)
      
      +---------------------------------------------+---+
      |window                                       |sum|
      +---------------------------------------------+---+
      |[2017-01-09 09:00:07.0,2017-01-09 09:00:12.0]|2  |
      |[2017-01-09 09:00:11.0,2017-01-09 09:00:16.0]|5  |
      |[2017-01-09 09:00:15.0,2017-01-09 09:00:20.0]|5  |
      +---------------------------------------------+---+
      

      下一个窗口开始2017-01-09 09:00:00 + startTime + n * slideDuration for n in 1..

    【讨论】:

      猜你喜欢
      • 2018-06-29
      • 2020-04-04
      • 2020-08-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-16
      相关资源
      最近更新 更多