【问题标题】:How to calculate lag difference in Spark Structured Streaming?如何计算 Spark Structured Streaming 中的滞后差异?
【发布时间】:2019-04-26 07:00:29
【问题描述】:

我正在编写一个 Spark Structured Streaming 程序。我需要创建一个具有滞后差异的附加列。

为了重现我的问题,我提供了代码 sn-p。此代码使用存储在data 文件夹中的data.json 文件:

[
  {"id": 77,"type": "person","timestamp": 1532609003},
  {"id": 77,"type": "person","timestamp": 1532609005},
  {"id": 78,"type": "crane","timestamp": 1532609005}
]

代码:

from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.sql.window import Window
from pyspark.sql.types import *

spark = SparkSession \
    .builder \
    .appName("Test") \
    .master("local[2]") \
    .getOrCreate()

schema = StructType([
    StructField("id", IntegerType()),
    StructField("type", StringType()),
    StructField("timestamp", LongType())
])

ds = spark \
    .readStream \
    .format("json") \
    .schema(schema) \
    .load("data/")

diff_window = Window.partitionBy("id").orderBy("timestamp")
ds = ds.withColumn("prev_timestamp", func.lag(ds.timestamp).over(diff_window))

query = ds \
    .writeStream \
    .format('console') \
    .start()

query.awaitTermination()

我收到此错误:

pyspark.sql.utils.AnalysisException: u'Non-time-based windows are not 支持流式 DataFrames/Datasets;;\nWindow [lag(timestamp#71L, 1, null) windowspecdefinition(host_id#68, 时间戳#71L ASC NULLS FIRST,前 1 和 1 之间的行 PRECEDING) 作为 prev_timestamp#129L]

【问题讨论】:

  • 我遇到了同样的问题。有什么解决办法吗?
  • 你有解决办法吗?

标签: apache-spark pyspark apache-spark-sql spark-structured-streaming


【解决方案1】:

pyspark.sql.utils.AnalysisException: u'流数据帧/数据集不支持非基于时间的窗口

意味着您的窗口应该基于timestamp 列。因此,如果您每秒都有一个数据点,并且您创建了一个带有 stride10s30s 窗口,那么您的结果窗口将创建一个新的 window 列,其中包含 startend 列这将包含时间戳的差异30s

你应该这样使用窗口:

words = words.withColumn('date_time', F.col('date_time').cast('timestamp'))

w = F.window('date_time', '30 seconds', '10 seconds')
words = words \
   .withWatermark('date_format', '1 minutes') \
   .groupBy(w).agg(F.mean('value'))

【讨论】:

    猜你喜欢
    • 2020-09-12
    • 2020-03-19
    • 1970-01-01
    • 2020-02-19
    • 1970-01-01
    • 2023-03-31
    • 1970-01-01
    • 1970-01-01
    • 2019-08-04
    相关资源
    最近更新 更多