【发布时间】:2019-06-28 17:52:21
【问题描述】:
我有一个正在编写批处理作业的用例
我需要读取一个 Kafka 主题并将数据记录到 HDFS。我的代码如下所示
val df: DataFrame = spark.read
.format("kafka")
.option("subscribe", "test-topic")
.option("includeTimestamp", true)
.option("kafka.bootstrap.servers", "localhost:9092")
.option("group.id", "test-cg")
.option("checkpointLocation", "/group/test/checkpointsDir")
.load
df.write.
parquet(buildPathWithCurrentBatchTime())
每次作业读取 Kafka 主题时,它都会从最早的偏移量开始,因此同一条消息会分批记录。 如何让我的作业在前一个作业实例读取的偏移量之后从偏移量开始读取消息。
我尝试设置检查点位置、组 ID 但没有帮助。
我不想使用流式查询。我有一个简单的用例来记录来自 Kafka 主题的数据。我没有任何延迟要求。唯一的要求是期刊中不能有任何重复。这是一个低优先级。如果我使用流式查询,它将一直使用执行器,这会浪费资源。因此我想分批做
【问题讨论】:
-
添加选项(“startingOffsets”,“latest”)。 ...
-
请说明你真正想让别人不要猜测你的意图。我认为您是在说“我有一个用例,我正在使用 Spark 结构化流 API 编写批处理作业。”在你的问题中。这会造成混淆 - 您没有使用结构化流,因为您没有将源和接收器初始化为 readStream / writeStream。
标签: apache-spark apache-kafka apache-spark-sql spark-structured-streaming