【问题标题】:How we manage offsets in Spark Structured Streaming? (Issues with _spark_metadata )我们如何管理 Spark Structured Streaming 中的偏移量? ( _spark_metadata 的问题)
【发布时间】:2020-10-07 12:39:25
【问题描述】:

背景: 我编写了一个简单的 spark 结构化蒸汽应用程序来将数据从 Kafka 移动到 S3。发现为了支持一次性保证,spark 创建了 _spark_metadata 文件夹,该文件夹最终变得太大,当流式应用程序运行很长时间时,元数据文件夹变得如此之大,以至于我们开始出现 OOM 错误。我想摆脱 Spark Structured Streaming 的元数据和检查点文件夹并自己管理偏移量。

我们如何在 Spark Streaming 中管理偏移量: 我使用 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 在 Spark Structured Streaming 中获取偏移量。但是想知道如何使用 Spark Structured Streaming 获取偏移量和其他元数据来管理我们自己的检查点。你有实现检查点的示例程序吗?

我们如何管理 Spark Structured Streaming 中的偏移量?? 看着这个 JIRA https://issues-test.apache.org/jira/browse/SPARK-18258。看起来没有提供偏移量。我们应该怎么做?

问题是在 6 小时内元数据的大小增加到 45MB,并且增长到接近 13GB。分配的驱动程序内存为 5GB。当时系统因OOM而崩溃。想知道如何避免让这些元数据变得如此庞大?如何让元数据不记录这么多信息。

代码:

1. Reading records from Kafka topic
  Dataset<Row> inputDf = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("startingOffsets", "earliest") \
  .load()
2. Use from_json API from Spark to extract your data for further transformation in a dataset.
   Dataset<Row> dataDf = inputDf.select(from_json(col("value").cast("string"), EVENT_SCHEMA).alias("event"))
       ....withColumn("oem_id", col("metadata.oem_id"));
3. Construct a temp table of above dataset using SQLContext
   SQLContext sqlContext = new SQLContext(sparkSession);
   dataDf.createOrReplaceTempView("event");
4. Flatten events since Parquet does not support hierarchical data.
5. Store output in parquet format on S3
   StreamingQuery query = flatDf.writeStream().format("parquet")

数据集 dataDf = inputDf.select(from_json(col("value").cast("string"), EVENT_SCHEMA).alias("event")) .select("event.metadata", "event.data", "event.connection", "event.registration_event","event.version_event" ); SQLContext sqlContext = new SQLContext(sparkSession); dataDf.createOrReplaceTempView("事件"); 数据集 flatDf = sqlContext .sql("select " + " date, time, id, " + flattenSchema(EVENT_SCHEMA, "event") + " from event"); StreamingQuery 查询 = flatDf .writeStream() .outputMode("追加") .option("压缩", "snappy") .format("镶木地板") .option(“检查点位置”,检查点位置) .option(“路径”,输出路径) .partitionBy("日期", "时间", "id") .trigger(Trigger.ProcessingTime(triggerProcessingTime)) 。开始(); query.awaitTermination();

【问题讨论】:

  • 批量还是非批量?
  • 添加评论以供参考
  • 请添加您的代码
  • 45mb 不算什么
  • 不相信相关的事情,请显示错误

标签: apache-spark spark-streaming spark-structured-streaming


【解决方案1】:

对于非批量 Spark Structured Streaming KAFKA 集成:

引用:

结构化流式处理忽略 Apache Kafka 中的偏移量提交。

相反,它依赖于驱动端自己的偏移管理,负责将偏移分配给执行者和 用于在处理回合结束时对它们进行检查点(epoch 或 微批次)。

如果您遵循 Spark KAFKA 集成指南,您不必担心。

优秀参考:https://www.waitingforcode.com/apache-spark-structured-streaming/apache-spark-structured-streaming-apache-kafka-offsets-management/read

batch 的情况不同,你需要自己管理并存储偏移量。

更新 根据 cmets,我建议问题略有不同,并建议您查看 Spark Structured Streaming Checkpoint Cleanup。除了您更新的 cmets 和没有错误的事实之外,我建议您对 Spark Structured Streaming https://www.waitingforcode.com/apache-spark-structured-streaming/checkpoint-storage-structured-streaming/read 的元数据进行咨询。看代码,和我的风格不同,但看不出有什么明显的错误。

【讨论】:

  • 感谢您的回复。我编写了一个简单的 spark 结构化蒸汽应用程序来将数据从 Kafka 移动到 S3。发现为了支持一次性保证,spark 创建了 _spark_metadata 文件夹,该文件夹最终变得太大,因为流应用程序应该永远运行。但是当流媒体应用程序运行很长时间时,元数据文件夹变得如此之大,以至于我们开始出现 OOM 错误。解决 OOM 的唯一方法是删除 Checkpoint 和 Metadata 文件夹并丢失有价值的客户数据。 Spark open JIRA SPARK-24295 和 SPARK-29995、SPARK-30462 和 SPARK-24295)
  • 这是目前情况的不利方面。关于人们试图缓解这种情况的各种帖子,但使用检查点没有问题。它将以非批量方式重新启动。很困惑,因为我刚刚重新阅读了关于 SSS 的 Databricks 指南以获得认证,但他们对此只字未提。使用预写日志等和检查点应该没有问题。不过,我注意到他们课程中的各种错误。主要问题是模式更改等过于停止。
  • 您可能需要修改您的问题和标题。
  • 非常感谢@thebluphantom.. 正在考虑回到 SPARK STREAMING(批处理版)。看起来指定检查点是强制性的。我试图删除检查点...我现在无法启动应用程序。 2020-06-17 20:00:04,222 错误 [驱动程序] org.apache.spark.deploy.yarn.ApplicationMaster:用户类抛出异常:org.apache.spark.sql.AnalysisException:必须通过选项指定检查点位置(“ checkpointLocation", ...) 或
  • 那么如何摆脱这个由_spark_metadata size增长这么大引起的OOM...
猜你喜欢
  • 2021-05-22
  • 2020-09-03
  • 1970-01-01
  • 2018-09-22
  • 2019-07-30
  • 2019-10-15
  • 2017-06-22
  • 2018-04-26
  • 2023-03-13
相关资源
最近更新 更多