【问题标题】:Duplicates while publishing data to kafka topic using spark-streaming使用 spark-streaming 将数据发布到 kafka 主题时重复
【发布时间】:2019-10-06 23:27:57
【问题描述】:

我有 spark-streaming 应用程序,它使用来自 topic1 的数据并解析它,然后将相同的记录发布到 2 个进程中,一个是 topic2,另一个是 hive 表。在将数据发布到 kafka topic2 时,我看到重复项,但在配置单元表中看不到重复项

使用 火花 2.2,卡夫卡 0.10.0

KafkaWriter.write(spark, storeSalesStreamingFinalDF, config)
writeToHIVE(spark, storeSalesStreamingFinalDF, config)


object KafkaWriter {

  def write(spark: SparkSession, df: DataFrame, config: Config)
  {
    df.select(to_json(struct("*")) as 'value)
      .write
      .format("kafka")
      .option("kafka.bootstrap.servers", config.getString("kafka.dev.bootstrap.servers"))
      .option("topic",config.getString("kafka.topic"))
      .option("kafka.compression.type",config.getString("kafka.compression.type"))
      .option("kafka.session.timeout.ms",config.getString("kafka.session.timeout.ms"))
      .option("kafka.request.timeout.ms",config.getString("kafka.request.timeout.ms"))
      .save()
  }
}

有人可以帮忙吗,

预计 kafka topic2 中没有重复项。

【问题讨论】:

  • 你解决了吗?
  • 还没有,在消费时处理重复。仍在寻找在发布时消除的选项

标签: apache-spark apache-kafka spark-streaming kafka-producer-api spark-streaming-kafka


【解决方案1】:

要处理重复数据,我们应该设置.option("kafka.processing.guarantee","exactly_once")

【讨论】:

  • 这没有帮助,我仍然看到重复
猜你喜欢
  • 2023-03-29
  • 2019-06-28
  • 1970-01-01
  • 2019-07-12
  • 2017-02-07
  • 2017-09-28
  • 2020-08-04
  • 2017-09-30
  • 2019-02-19
相关资源
最近更新 更多