【问题标题】:Spark: Python Windowed Functions for Data FramesSpark:用于数据帧的 Python 窗口函数
【发布时间】:2016-02-28 13:00:20
【问题描述】:

用例是捕获流式传感器条目之间的时间差异,其中站点和部件相同,以与容差进行比较,并在超出范围时触发警报。我目前正在将字段解析为数据框并将其注册为表以使用 LAG 函数执行 SQL 查询。

events = rawFilter.map(lambda x: x.split("|")).map(lambda x: (x[0], x[1], x[2]))
eventSchema = StructType(
  [StructField("station", StringType(), False),
  StructField("part", StringType(), False),
  StructField("event", TimestampType(), False)])

eventDF = sqlContext.createDataFrame(events,eventSchema)
eventDF.registerTempTable("events_table")

%sql select station, part, event, prev_event, 
    cast(event as double) - cast(prev_event as double) as CycleTime 
    from (select station, part, event, 
    LAG(event) over (Partition BY station, part Order BY event) as Prev_Event 
    from events_table) x limit 10

Example Streaming Sensor Data:
station1|part1|<timestamp>
station2|part2|<timestamp>
station3|part3|<timestamp>
station1|part1|<timestamp>
station1|part1|<timestamp>
station1|part1|<timestamp>
station3|part3|<timestamp>
station1|part1|<timestamp>

我想了解的是如何在数据框中完成窗口函数,以便生成的表已经计算出时间差?

本题的第 2 部分是了解当部件发生变化时如何处理。在这种情况下,不应计算或停止 CycleTime;但是,同一站的两个不同部分之间的时间差是另一个称为 ChangeOver 的计算。我不知道如何使用 Spark Streaming 完成此操作,因为窗口可能会在 Part 更改之前延长几天。所以我正在考虑将数据推送到 Hbase 或其他东西来计算 ChangeOver。

【问题讨论】:

    标签: python sql apache-spark apache-spark-sql spark-streaming


    【解决方案1】:

    DataFrames 上的窗口定义严格遵循 SQL 约定,partitionByorderByrangeBetweenrowsBetween 方法对应于等效的 SQL 子句。

    from pyspark.sql.functions import col, lag, unix_timestamp
    from pyspark.sql.window import Window
    
    rawDF = sc.parallelize([
        ("station1", "part1", "2015-01-03 00:11:02"),
        ("station2", "part2", "2015-02-00 10:20:10"),
        ("station3", "part3", "2015-03-02 00:30:00"),
        ("station1", "part1", "2015-05-00 01:07:00"),
        ("station1", "part1", "2015-01-13 05:16:10"),
        ("station1", "part1", "2015-11-20 10:22:40"),
        ("station3", "part3", "2015-09-04 03:15:22"),
        ("station1", "part1", "2015-03-05 00:41:33")
    ]).toDF(["station", "part", "event"])
    
    eventDF = rawDF.withColumn("event", unix_timestamp(col("event")))
    
    w = Window.partitionBy(col("station")).orderBy(col("event"))
    
    (eventDF
      .withColumn("prev_event", lag(col("event")).over(w))
      .withColumn("cycle_time", col("event") - col("prev_event")))
    

    【讨论】:

    • 你成功了!!我做了你提到的改变,不得不在两个日期都添加演员表。我还将 Part 列添加到 PartitionBy 函数中,结果如下...w = Window.partitionBy(col("Station"), col("Part")).orderBy(col("Event")) resultDF = (eventDF .withColumn("Prev_Event", lag(col("Event")).over(w)) .withColumn("Cycle_Time", col("Event").cast("double") - col("Prev_Event").cast("double")))
    猜你喜欢
    • 1970-01-01
    • 2015-06-11
    • 2019-04-17
    • 2022-01-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-04-06
    相关资源
    最近更新 更多