【问题标题】:Managing Offsets with Spark Structured Batch Job with Kafka使用 Kafka 使用 Spark 结构化批处理作业管理偏移量
【发布时间】:2019-06-28 17:52:21
【问题描述】:

我有一个正在编写批处理作业的用例

我需要读取一个 Kafka 主题并将数据记录到 HDFS。我的代码如下所示

val df: DataFrame = spark.read
  .format("kafka")
  .option("subscribe", "test-topic")
  .option("includeTimestamp", true)
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("group.id", "test-cg")
  .option("checkpointLocation", "/group/test/checkpointsDir")
  .load

df.write.
  parquet(buildPathWithCurrentBatchTime())

每次作业读取 Kafka 主题时,它都会从最早的偏移量开始,因此同一条消息会分批记录。 如何让我的作业在前一个作业实例读取的偏移量之后从偏移量开始读取消息。

我尝试设置检查点位置、组 ID 但没有帮助。

我不想使用流式查询。我有一个简单的用例来记录来自 Kafka 主题的数据。我没有任何延迟要求。唯一的要求是期刊中不能有任何重复。这是一个低优先级。如果我使用流式查询,它将一直使用执行器,这会浪费资源。因此我想分批做

【问题讨论】:

  • 添加选项(“startingOffsets”,“latest”)。 ...
  • 请说明你真正想让别人不要猜测你的意图。我认为您是在说“我有一个用例,我正在使用 Spark 结构化流 API 编写批处理作业。”在你的问题中。这会造成混淆 - 您没有使用结构化流,因为您没有将源和接收器初始化为 readStream / writeStream。

标签: apache-spark apache-kafka apache-spark-sql spark-structured-streaming


【解决方案1】:

您使用的是批量查询而不是流式查询。 (也许缺少点?)只需将 read 替换为 readStreamwrite 替换为 writeStream 即可。

编辑:正如 OP 澄清的那样,可以使用一次性触发器,我刚刚更新了代码以使用带有一次性触发器的结构化流。 (免责声明:我没有编译/运行代码,但更改适合结构化流式处理指南文档。)

val df: DataFrame = spark.readStream
  .format("kafka")
  .option("subscribe", "test-topic")
  .option("includeTimestamp", true)
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("group.id", "test-cg")
  .option("checkpointLocation", "/group/test/checkpointsDir")
  .load

val query = df.writeStream
  .format("parquet")
  .option("path", buildPathWithCurrentBatchTime())
  .trigger(Trigger.Once())
  .start()

query.awaitTermination()

【讨论】:

  • 我不想使用流式查询。我有一个简单的用例来记录来自 Kafka 主题的数据。我没有任何延迟要求。唯一的要求是期刊中不能有任何重复。这是一个低优先级。如果我使用流式查询,它将一直使用执行器,这会浪费资源。因此我想分批做
  • 首先,如果您因为我建议流式查询而投反对票,请在您的问题中说明您的意图。 Spark 提供“一次”触发器,它运行单个微批处理并终止仍然检查点偏移信息。请参考databricks.com/blog/2017/05/22/…
  • 我更正了这个问题。请使用一次触发解决方案更新答案。
猜你喜欢
  • 1970-01-01
  • 2019-10-03
  • 1970-01-01
  • 2018-06-08
  • 2021-03-18
  • 2019-01-14
  • 2018-09-22
  • 2017-02-06
  • 2021-01-15
相关资源
最近更新 更多