【发布时间】:2018-03-18 15:20:19
【问题描述】:
我正在尝试从 Kafka 进行结构化流式传输。我打算将检查点存储在 HDFS 中。我阅读了一篇 Cloudera 博客,建议不要将检查点存储在 HDFS 中以进行 Spark 流式传输。结构流检查点是否也是同样的问题。 https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/.
在结构化流中,如果我的 spark 程序关闭了一段时间,我如何从检查点目录获取最新的偏移量并在该偏移量之后加载数据。 我将检查点存储在如下所示的目录中。
df.writeStream\
.format("text")\
.option("path", '\files') \
.option("checkpointLocation", 'checkpoints\chkpt') \
.start()
更新:
这是我的结构化流程序读取 Kafka 消息,解压缩并写入 HDFS。
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaServer) \
.option("subscribe", KafkaTopics) \
.option("failOnDataLoss", "false")\
.load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
Transaction_DF.printSchema()
decomp = Transaction_DF.select(zip_extract("value").alias("decompress"))
#zip_extract is a UDF to decompress the stream
query = decomp.writeStream\
.format("text")\
.option("path", \Data_directory_inHDFS) \
.option("checkpointLocation", \pathinDHFS\) \
.start()
query.awaitTermination()
【问题讨论】:
-
您确定博客建议您不要将检查点存储在 HDFS 上吗?这很奇怪。你有链接吗?对于结构化流问题,只需使用相同的检查点目录运行相同的代码,结构化流将获取最后的失败偏移量并从中重新启动。
-
@zsxwing 这是cloudera 博客链接blog.cloudera.com/blog/2017/06/… 我手动杀死了我的流媒体程序一分钟,然后再次启动它,它开始处理它仅在启动后收到的消息。它在关闭时忽略了错过的消息,并且不再处理它们
-
能不能看一下驱动日志,找到
logInfo(s"GetBatch called with start = $start, end = $end")输出的日志?它应该告诉您查询处理了什么。
标签: hadoop pyspark spark-structured-streaming