【发布时间】:2020-10-07 12:39:25
【问题描述】:
背景: 我编写了一个简单的 spark 结构化蒸汽应用程序来将数据从 Kafka 移动到 S3。发现为了支持一次性保证,spark 创建了 _spark_metadata 文件夹,该文件夹最终变得太大,当流式应用程序运行很长时间时,元数据文件夹变得如此之大,以至于我们开始出现 OOM 错误。我想摆脱 Spark Structured Streaming 的元数据和检查点文件夹并自己管理偏移量。
我们如何在 Spark Streaming 中管理偏移量: 我使用 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 在 Spark Structured Streaming 中获取偏移量。但是想知道如何使用 Spark Structured Streaming 获取偏移量和其他元数据来管理我们自己的检查点。你有实现检查点的示例程序吗?
我们如何管理 Spark Structured Streaming 中的偏移量?? 看着这个 JIRA https://issues-test.apache.org/jira/browse/SPARK-18258。看起来没有提供偏移量。我们应该怎么做?
问题是在 6 小时内元数据的大小增加到 45MB,并且增长到接近 13GB。分配的驱动程序内存为 5GB。当时系统因OOM而崩溃。想知道如何避免让这些元数据变得如此庞大?如何让元数据不记录这么多信息。
代码:
1. Reading records from Kafka topic
Dataset<Row> inputDf = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.option("startingOffsets", "earliest") \
.load()
2. Use from_json API from Spark to extract your data for further transformation in a dataset.
Dataset<Row> dataDf = inputDf.select(from_json(col("value").cast("string"), EVENT_SCHEMA).alias("event"))
....withColumn("oem_id", col("metadata.oem_id"));
3. Construct a temp table of above dataset using SQLContext
SQLContext sqlContext = new SQLContext(sparkSession);
dataDf.createOrReplaceTempView("event");
4. Flatten events since Parquet does not support hierarchical data.
5. Store output in parquet format on S3
StreamingQuery query = flatDf.writeStream().format("parquet")
数据集 dataDf = inputDf.select(from_json(col("value").cast("string"), EVENT_SCHEMA).alias("event")) .select("event.metadata", "event.data", "event.connection", "event.registration_event","event.version_event" ); SQLContext sqlContext = new SQLContext(sparkSession); dataDf.createOrReplaceTempView("事件"); 数据集 flatDf = sqlContext .sql("select " + " date, time, id, " + flattenSchema(EVENT_SCHEMA, "event") + " from event"); StreamingQuery 查询 = flatDf .writeStream() .outputMode("追加") .option("压缩", "snappy") .format("镶木地板") .option(“检查点位置”,检查点位置) .option(“路径”,输出路径) .partitionBy("日期", "时间", "id") .trigger(Trigger.ProcessingTime(triggerProcessingTime)) 。开始(); query.awaitTermination();
【问题讨论】:
-
批量还是非批量?
-
添加评论以供参考
-
请添加您的代码
-
45mb 不算什么
-
不相信相关的事情,请显示错误
标签: apache-spark spark-streaming spark-structured-streaming