【发布时间】:2019-10-29 17:52:58
【问题描述】:
我正在使用 spark 2.4.0 和 python 3.6。我正在开发一个带有 pyspark 结构化流操作的 python 程序。该程序从两个套接字运行两个读取流,然后将这两个流数据帧合并。我尝试了 spark 2.4.0 和 2.4.3,但没有任何改变。 然后我执行一个独特的写入流,以便只写入一个输出流数据帧。效果很好。 但是,由于我还需要为所有微批处理编写一个非流数据集,所以我在 writestream 中编写了一个 foreachBatch 调用。那行不通。
我将 spark.scheduler.mode=FAIR 放在 spark.defaults.conf 中。我正在运行 spark-submit,但即使我直接尝试使用 python3,它也根本不起作用。看起来它没有执行 foreachBatch 中提到的 splitStream 函数。我尝试在 splitStream 函数中添加一些打印,但没有任何效果。
我做了很多尝试,但没有任何改变,我通过 spark-submit 和 python 提交。我正在开发一个 spark 独立集群。
inDF_1 = spark \
.readStream \
.format('socket') \
.option('host', host_1) \
.option('port', port_1) \
.option("maxFilesPerTrigger", 1) \
.load()
inDF_2 = spark \
.readStream \
.format('socket') \
.option('host', host_2) \
.option('port', port_2) \
.option("maxFilesPerTrigger", 1) \
.load() \
.coalesce(1)
inDF = inDF_1.union(inDF_2)
#--------------------------------------------------#
# write streaming raw dataser R-01 plateMeasures #
#--------------------------------------------------#
def splitStream(df, epoch_id):
df \
.write \
.format('text') \
.outputMode('append') \
.start(path = outDir0)
listDF = df.collect()
print(listDF)
pass
stageDir = dLocation.getLocationDir('R-00')
outDir0 = dLocation.getLocationDir(outList[0])
chkDir = dLocation.getLocationDir('CK-00')
query0 = programName + '_q0'
q0 = inDF_1 \
.writeStream \
.foreachBatch(splitStream) \
.format('text') \
.outputMode('append') \
.queryName(query0) \
.start(path = stageDir
, checkpointLocation = chkDir)
我使用 foreachBatch 是因为我需要为每个输入微批处理编写多个接收器。 非常感谢大家可以尝试帮助我。
【问题讨论】:
标签: pyspark spark-structured-streaming