【发布时间】:2016-02-28 13:00:20
【问题描述】:
用例是捕获流式传感器条目之间的时间差异,其中站点和部件相同,以与容差进行比较,并在超出范围时触发警报。我目前正在将字段解析为数据框并将其注册为表以使用 LAG 函数执行 SQL 查询。
events = rawFilter.map(lambda x: x.split("|")).map(lambda x: (x[0], x[1], x[2]))
eventSchema = StructType(
[StructField("station", StringType(), False),
StructField("part", StringType(), False),
StructField("event", TimestampType(), False)])
eventDF = sqlContext.createDataFrame(events,eventSchema)
eventDF.registerTempTable("events_table")
%sql select station, part, event, prev_event,
cast(event as double) - cast(prev_event as double) as CycleTime
from (select station, part, event,
LAG(event) over (Partition BY station, part Order BY event) as Prev_Event
from events_table) x limit 10
Example Streaming Sensor Data:
station1|part1|<timestamp>
station2|part2|<timestamp>
station3|part3|<timestamp>
station1|part1|<timestamp>
station1|part1|<timestamp>
station1|part1|<timestamp>
station3|part3|<timestamp>
station1|part1|<timestamp>
我想了解的是如何在数据框中完成窗口函数,以便生成的表已经计算出时间差?
本题的第 2 部分是了解当部件发生变化时如何处理。在这种情况下,不应计算或停止 CycleTime;但是,同一站的两个不同部分之间的时间差是另一个称为 ChangeOver 的计算。我不知道如何使用 Spark Streaming 完成此操作,因为窗口可能会在 Part 更改之前延长几天。所以我正在考虑将数据推送到 Hbase 或其他东西来计算 ChangeOver。
【问题讨论】:
标签: python sql apache-spark apache-spark-sql spark-streaming