【发布时间】:2018-08-10 09:15:32
【问题描述】:
我的代码是这样的
df = spark.readStream.option("header","true") \
.schema(df_schema)\
.csv(df_file)
df2 = df.filter(df.col == 1)
df3 = df2.withColumn("new_col", udf_f(df2.some_col))
dfc = df3.where(df3.new_col == 2).count()
query = dfc.writeStream.outputMode("append").format("console").start()
query.awaitTermination()
我在dfc 行收到错误消息Queries with streaming sources must be executed with writeStream.start(),但我不确定我做错了什么。 Spark 结构化流不支持这样的链式查询吗?据我所知,我没有做任何分支。
编辑:
通过从dfc 行中删除count(),我得到了一个由query.awaitTermination() 调用引起的新错误StreamingQueryException: Exception thrown in awaitResult。知道为什么count() 不起作用以及为什么会出现新错误吗?
编辑 2:
如果我直接登录到控制台而不在 df 之后运行所有中间查询,它就可以工作。但是,每次我尝试运行其他查询时,都会引发 StreamingQueryException。
【问题讨论】:
标签: python apache-spark pyspark spark-structured-streaming