【发布时间】:2019-04-26 07:00:29
【问题描述】:
我正在编写一个 Spark Structured Streaming 程序。我需要创建一个具有滞后差异的附加列。
为了重现我的问题,我提供了代码 sn-p。此代码使用存储在data 文件夹中的data.json 文件:
[
{"id": 77,"type": "person","timestamp": 1532609003},
{"id": 77,"type": "person","timestamp": 1532609005},
{"id": 78,"type": "crane","timestamp": 1532609005}
]
代码:
from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.sql.window import Window
from pyspark.sql.types import *
spark = SparkSession \
.builder \
.appName("Test") \
.master("local[2]") \
.getOrCreate()
schema = StructType([
StructField("id", IntegerType()),
StructField("type", StringType()),
StructField("timestamp", LongType())
])
ds = spark \
.readStream \
.format("json") \
.schema(schema) \
.load("data/")
diff_window = Window.partitionBy("id").orderBy("timestamp")
ds = ds.withColumn("prev_timestamp", func.lag(ds.timestamp).over(diff_window))
query = ds \
.writeStream \
.format('console') \
.start()
query.awaitTermination()
我收到此错误:
pyspark.sql.utils.AnalysisException: u'Non-time-based windows are not 支持流式 DataFrames/Datasets;;\nWindow [lag(timestamp#71L, 1, null) windowspecdefinition(host_id#68, 时间戳#71L ASC NULLS FIRST,前 1 和 1 之间的行 PRECEDING) 作为 prev_timestamp#129L]
【问题讨论】:
-
我遇到了同样的问题。有什么解决办法吗?
-
你有解决办法吗?
标签: apache-spark pyspark apache-spark-sql spark-structured-streaming