【问题标题】:Running chained queries using structured streaming (PySpark)使用结构化流 (PySpark) 运行链式查询
【发布时间】:2018-08-10 09:15:32
【问题描述】:

我的代码是这样的

df = spark.readStream.option("header","true") \
    .schema(df_schema)\
    .csv(df_file)
df2 = df.filter(df.col == 1)
df3 = df2.withColumn("new_col", udf_f(df2.some_col))
dfc = df3.where(df3.new_col == 2).count()
query = dfc.writeStream.outputMode("append").format("console").start()
query.awaitTermination()

我在dfc 行收到错误消息Queries with streaming sources must be executed with writeStream.start(),但我不确定我做错了什么。 Spark 结构化流不支持这样的链式查询吗?据我所知,我没有做任何分支。

编辑:

通过从dfc 行中删除count(),我得到了一个由query.awaitTermination() 调用引起的新错误StreamingQueryException: Exception thrown in awaitResult。知道为什么count() 不起作用以及为什么会出现新错误吗?

编辑 2:

如果我直接登录到控制台而不在 df 之后运行所有中间查询,它就可以工作。但是,每次我尝试运行其他查询时,都会引发 StreamingQueryException

【问题讨论】:

    标签: python apache-spark pyspark spark-structured-streaming


    【解决方案1】:

    由于structured streaming 的性质,不可能以与静态数据帧相同的方式获取计数。创建流时,Spark 使用 trigger 轮询源以获取新数据。如果有任何 Spark 将其拆分为小的 DataFrame(微批次)并沿流传递(转换、聚合、输出)。

    如果您需要获取记录数,您可以添加listener to get progress updates 并在onQueryProgress(QueryProgressEvent event) 中获取输入数。

    很难说为什么你会得到StreamingQueryException,因为filter()withColumn() 在结构化流中正常工作。 您是否在控制台中看到其他可能导致 Exception thrown in awaitResult 的错误?

    顺便说一句,如果您在单个会话中有多个流,您应该使用spark.streams.awaitAnyTermination() 阻止,直到其中任何一个终止。

    以下查询应该可以正常工作:

    query = spark.readStream
        .option("header","true") \
        .schema(df_schema)\
        .csv(df_file)\
        .filter(df.col == 1)\
        .withColumn("new_col", udf_f(df2.some_col))\
        .writeStream\
        .format("console")\
        .outputMode("append")\
        .start()
    
    query.awaitTermination()
    # or spark.streams().awaitAnyTermination()
    

    【讨论】:

    • 谢谢。导致异常的错误是java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347) at scala.None$.get(Option.scala:345)如果我删除流组件,即如果我对数据进行采样,代码可以工作。
    • 我还在数据框上使用了join。这有什么影响吗?
    • 你得到的错误看起来像一个warning in metrics,它不影响火花数据处理。如果您使用的是 AWS EMR,则应尝试将其降级到 5.9.0 或直接取消警告。
    • 您可以将静态数据帧加入流,从 Spark 2.3 开始支持stream-stream joins。如果您使用的是 EMR,那么您应该等待新版本的 EMR,因为最新版本只有 Spark 2.2.1。
    • 我的错误终止了工作。我得到..: java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347) at scala.None$.get(Option.scala:345) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:204) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:172)
    猜你喜欢
    • 1970-01-01
    • 2019-10-29
    • 2021-02-06
    • 2018-08-11
    • 2019-04-04
    • 1970-01-01
    • 1970-01-01
    • 2021-02-19
    • 1970-01-01
    相关资源
    最近更新 更多