【问题标题】:pyspark 2.4.x structured streaming foreachBatch not runningpyspark 2.4.x 结构化流 foreachBatch 未运行
【发布时间】:2019-10-29 17:52:58
【问题描述】:

我正在使用 spark 2.4.0 和 python 3.6。我正在开发一个带有 pyspark 结构化流操作的 python 程序。该程序从两个套接字运行两个读取流,然后将这两个流数据帧合并。我尝试了 spark 2.4.0 和 2.4.3,但没有任何改变。 然后我执行一个独特的写入流,以便只写入一个输出流数据帧。效果很好。 但是,由于我还需要为所有微批处理编写一个非流数据集,所以我在 writestream 中编写了一个 foreachBatch 调用。那行不通。

我将 spark.scheduler.mode=FAIR 放在 spark.defaults.conf 中。我正在运行 spark-submit,但即使我直接尝试使用 python3,它也根本不起作用。看起来它没有执行 foreachBatch 中提到的 splitStream 函数。我尝试在 splitStream 函数中添加一些打印,但没有任何效果。

我做了很多尝试,但没有任何改变,我通过 spark-submit 和 python 提交。我正在开发一个 spark 独立集群。

inDF_1 = spark \
    .readStream \
    .format('socket') \
    .option('host', host_1) \
    .option('port', port_1) \
    .option("maxFilesPerTrigger", 1) \
    .load()

inDF_2 = spark \
    .readStream \
    .format('socket') \
    .option('host', host_2) \
    .option('port', port_2) \
    .option("maxFilesPerTrigger", 1) \
    .load() \
    .coalesce(1)

inDF = inDF_1.union(inDF_2)

#--------------------------------------------------#
#  write streaming raw dataser R-01 plateMeasures  #
#--------------------------------------------------#

def splitStream(df, epoch_id):
    df \
        .write \
        .format('text') \
        .outputMode('append') \
        .start(path = outDir0)

    listDF = df.collect()
    print(listDF)
    pass

stageDir = dLocation.getLocationDir('R-00')
outDir0 = dLocation.getLocationDir(outList[0])
chkDir = dLocation.getLocationDir('CK-00')
query0 = programName + '_q0'
q0 = inDF_1 \
        .writeStream \
        .foreachBatch(splitStream) \
        .format('text') \
        .outputMode('append') \
        .queryName(query0) \
        .start(path = stageDir
                    , checkpointLocation = chkDir)

我使用 foreachBatch 是因为我需要为每个输入微批处理编写多个接收器。 非常感谢大家可以尝试帮助我。

【问题讨论】:

    标签: pyspark spark-structured-streaming


    【解决方案1】:

    我已经在我的本地机器上尝试过这个并且适用于 Spark > 2.4。

    df.writeStream
      .foreachBatch((microBatchDF, microBatchId) => {     
        microBatchDF
          .withColumnRenamed("value", "body")
          .write
          .format("console")
          .option("checkpointLocation","checkPoint")
          .save()
      })
      .start()
      .awaitTermination()
    

    【讨论】:

    • 很好的答案。您介意解释一下缺少什么吗?
    • @f.khantsis 我想知道是否有像 start(path,checkpoint) 这样的重载方法。我不确定 PySpark。我们不需要为外部的流式查询指定开始和检查点。无论我们在做什么操作都在 forEachBatch 中。
    猜你喜欢
    • 2018-08-10
    • 1970-01-01
    • 1970-01-01
    • 2020-11-30
    • 1970-01-01
    • 2020-12-12
    • 2020-04-29
    • 2020-04-28
    • 1970-01-01
    相关资源
    最近更新 更多