【问题标题】:Spark Streaming failing due to error on a different Kafka topic than the one being read由于 Kafka 主题与正在读取的主题不同,Spark Streaming 失败
【发布时间】:2019-11-23 15:16:06
【问题描述】:

对于以下写入主题/阅读主题air2008rand tandem:

import org.apache.spark.sql.streaming.Trigger
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("subscribe", "air2008rand")
.load()
.groupBy('value.cast("string").as('key))
.agg(count("*").cast("string") as 'value)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("includeTimestamp", true)
.option("topic","t1")
.trigger(Trigger.ProcessingTime("2 seconds"))
.outputMode("update")
.option("checkpointLocation","/tmp/cp")
.start)

由于一个不同主题air2008m1-0产生了一个错误:

scala> 19/07/14 13:27:22 ERROR MicroBatchExecution: Query [id = 711d44b2-3224-4493-8677-e5c8cc4f3db4, runId = 68a3519a-e9cf-4a82-9d96-99be833227c0] 
terminated with error
java.lang.IllegalStateException: Set(air2008m1-0) are gone. 
Some data may have been missed.
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you don't want your streaming query to fail on such cases, set the
 source option "failOnDataLoss" to "false".
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org$apache$spark$sql$kafka010$KafkaMicroBatchReader$$reportDataLoss(KafkaMicroBatchReader.scala:261)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.planInputPartitions(KafkaMicroBatchReader.scala:124)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions$lzycompute(DataSourceV2ScanExec.scala:76)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions(DataSourceV2ScanExec.scala:75)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:65)

可以通过停止读/写代码(在 spark-shell repl 中)然后重新运行它来重复此行为。

为什么这里不同的kafka主题之间会出现“串扰”?

【问题讨论】:

    标签: apache-spark apache-kafka spark-structured-streaming


    【解决方案1】:

    问题是由于检查点目录包含来自早期火花流操作的数据。解决方法是更改​​检查点目录。

    在这个问题[IllegalStateException]: Spark Structured Streaming is termination Streaming Query with Error 中作为评论(来自@jaceklaskowski 本人)找到了解决方案

    【讨论】:

      猜你喜欢
      • 2019-06-24
      • 1970-01-01
      • 2019-07-12
      • 1970-01-01
      • 2023-03-25
      • 2017-11-10
      • 1970-01-01
      • 2018-01-09
      • 2021-01-05
      相关资源
      最近更新 更多