【问题标题】:Java Kafka Structured StreamingJava Kafka 结构化流
【发布时间】:2021-08-23 02:33:35
【问题描述】:

我必须通过 Spark 从 Kafka 执行批量查询(基本上是在一个循环中),每次都从上一次迭代中读取的最后一个偏移量开始,这样我就只读取新数据。

Dataset<Row> df = spark
                .read()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "test-reader")
                .option("enable.auto.commit", true)
                .option("kafka.group.id", "demo-reader") //not sure about the one to use
                .option("group.id", "demo-reader")
                .option("startingOffset", "latest")
                .load()

批处理查询似乎不支持latest。我想知道是否有可能以另一种方式做类似的事情(不直接处理偏移量)。

编辑: earliest 似乎检索到了主题中包含的全部数据。

【问题讨论】:

  • 你能解释一下你想要做批处理循环而不是流式传输吗?
  • 我有限制一起处理相同“类型”的数据,有一个逻辑说明何时处理某个批次。流媒体没有这些区别,而只考虑句点
  • 我相信至少在 spark 2.4.3 之前这是不可能的(消费者到偏移映射基于组 id,这在 spark 2.4.3 [spark.apache.org/docs/2.4.3/… 中不受支持。但是似乎火花3.1 确实支持消费者组 id [spark.apache.org/docs/3.1.2/…。您使用的是哪个版本?
  • 我在 3.1.1,但仍然没有,它不起作用

标签: java apache-spark apache-kafka spark-structured-streaming


【解决方案1】:

您可以尝试用earliest 代替latest 代替startingOffsets,如下例所示:

Dataset<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test-reader")
  .option("enable.auto.commit", true)
  .option("kafka.group.id", "demo-reader") //not sure about the one to use
  .option("group.id", "demo-reader")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load();

请参考spark docs

根据文档,您应该使用“最新”进行流式传输,使用“最早”进行批处理。

【讨论】:

  • 我仍在了解整个主题
  • 我希望你原来的问题已经解决了。
  • 很遗憾,我只想要以前没有查询过的数据
  • 在描述中您提到“批处理查询中似乎不支持latest。我想知道是否可以以另一种方式做类似的事情(不直接处理偏移量)”。如果这不是问题,您能否更新您的问题?
  • 也许我没有得到“最新”的目的。据我了解,这应该在startingOffset中使用,以仅使用消费者尚未处理的数据。如果有错,请您澄清一下?
猜你喜欢
  • 2020-10-18
  • 2018-03-18
  • 2020-01-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-01-24
  • 1970-01-01
相关资源
最近更新 更多