【问题标题】:Kafka Structured Streaming checkpointKafka 结构化流检查点
【发布时间】:2018-03-18 15:20:19
【问题描述】:

我正在尝试从 Kafka 进行结构化流式传输。我打算将检查点存储在 HDFS 中。我阅读了一篇 Cloudera 博客,建议不要将检查点存储在 HDFS 中以进行 Spark 流式传输。结构流检查点是否也是同样的问题。 https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/.

在结构化流中,如果我的 spark 程序关闭了一段时间,我如何从检查点目录获取最新的偏移量并在该偏移量之后加载数据。 我将检查点存储在如下所示的目录中。

 df.writeStream\
        .format("text")\
        .option("path", '\files') \
        .option("checkpointLocation", 'checkpoints\chkpt') \
        .start()

更新:

这是我的结构化流程序读取 Kafka 消息,解压缩并写入 HDFS。

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KafkaServer) \
        .option("subscribe", KafkaTopics) \
        .option("failOnDataLoss", "false")\
         .load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
Transaction_DF.printSchema()

decomp = Transaction_DF.select(zip_extract("value").alias("decompress"))
#zip_extract is a UDF to decompress the stream

query = decomp.writeStream\
    .format("text")\
    .option("path", \Data_directory_inHDFS) \
    .option("checkpointLocation", \pathinDHFS\) \
    .start()

query.awaitTermination()

【问题讨论】:

  • 您确定博客建议您不要将检查点存储在 HDFS 上吗?这很奇怪。你有链接吗?对于结构化流问题,只需使用相同的检查点目录运行相同的代码,结构化流将获取最后的失败偏移量并从中重新启动。
  • @zsxwing 这是cloudera 博客链接blog.cloudera.com/blog/2017/06/… 我手动杀死了我的流媒体程序一分钟,然后再次启动它,它开始处理它仅在启动后收到的消息。它在关闭时忽略了错过的消息,并且不再处理它们
  • 能不能看一下驱动日志,找到logInfo(s"GetBatch called with start = $start, end = $end")输出的日志?它应该告诉您查询处理了什么。

标签: hadoop pyspark spark-structured-streaming


【解决方案1】:

最好将检查点存储在长期存储(HDFS、AWS S3 等)上。我想在这里补充一点,属性“failOnDataLoss”不应设置为 false,因为这不是最佳实践。数据丢失是没有人愿意承受的。休息吧,你是在正确的道路上。

【讨论】:

  • 在这样做的同时,如何防止 HDFS 中的检查点随着时间的推移使用越来越多的存储空间?是否有任何“清理”配置可以用来管理它?
  • 据我所知,检查点存储的数据不多,它存储偏移量就像Kafka一样,所以你不必担心存储问题,如果你想清除检查点,你可以这样做它在维护期间,或者你可以为此设置一个调度程序。
  • 我在 SparkConf 上使用“spark.cleaner.referenceTracking.cleanCheckpoints”,“true”,为我工作清理检查点。
【解决方案2】:

在结构化流中,如果我的 spark 程序关闭了一段时间, 如何从检查点目录获取最新偏移量并加载数据 在那个偏移之后。

在您的 checkpointdir 文件夹下,您会找到一个名为“offsets”的文件夹。文件夹“偏移量”维护要从 kafka 请求的下一个偏移量。打开'offsets'文件夹下的最新文件(最新的批处理文件),下一个预期的偏移量将在下面的格式

{"kafkatopicname":{"2":16810618,"1":16810853,"0":91332989}}

要在该偏移量之后加载数据,请将以下属性设置为您的 spark 读取流

 .option("startingOffsets", "{\""+topic+"\":{\"0\":91332989,\"1\":16810853,\"2\":16810618}}")

0,1,2 是 topic 中的分区。

【讨论】:

    【解决方案3】:

    据我了解,它建议在以下位置维护偏移管理:Hbase、Kafka、HDFS 或 Zookeeper。

    "值得一提的是,您还可以将偏移量存储在存储中 像 HDFS 这样的系统。在 HDFS 中存储偏移量是一种不太流行的方法 与上述选项相比,HDFS 具有更高的延迟 ZooKeeper 和 HBase 等其他系统。”

    您可以在 Spark 文档中找到如何从现有检查点重新启动查询:http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing

    【讨论】:

      【解决方案4】:

      在您的查询中,尝试应用检查点,同时将结果以某种格式(如 parquet)写入 HDFS 等持久​​性存储。它对我很好。

      【讨论】:

      • 我在问题中添加了完整代码作为更新。您是如何从检查点文件中获取最新偏移量的。
      • 在这样做的同时,如何防止 HDFS 中的检查点随着时间的推移使用越来越多的存储空间?是否有任何“清理”配置可以用来管理它?
      猜你喜欢
      • 2019-09-13
      • 2018-06-22
      • 2023-03-10
      • 1970-01-01
      • 2021-08-23
      • 2021-12-03
      • 2017-06-19
      • 1970-01-01
      • 2020-10-18
      相关资源
      最近更新 更多