【问题标题】:Spark - Kafka offset management while eror scenarioSpark - 错误场景时的 Kafka 偏移管理
【发布时间】:2018-08-30 01:16:10
【问题描述】:

我是 Sacla 的新手。我想在从 Kafka 读取消息并写入 Cassandra DB 时处理流偏移事务。每次写入后,我都会向 Kafka 提交偏移量。如果 DB 写入时出现任何错误,我需要跳过 Kafka 偏移写入。

DB 出错时如何跳过 Kafka Offset 写入?

代码

   kafkaStream.foreach(rdd=> {
      rdd.foreachRDD(conRec=> {
        val offsetRanges = conRec.asInstanceOf[HasOffsetRanges].offsetRanges
        conRec.foreach(str=> {
          try {
            CassandraHelper.saveItemEvent(str.value())
          }catch {
            case ex: Exception => {
              println(ex.getMessage)
            }
          }
        })
        rdd.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      })
    })

【问题讨论】:

  • 建议:Kafka Connect 有一个 Cassandra 连接器,可以在出错时为您保存偏移量
  • 如果保存时发生任何错误,那么您的场景是什么?你想继续流式传输吗?如果是这样,您误解了 kakfa 的用途。错误消息的计划是什么?卡夫卡只是排队。使用您的消费者读取的组 ID,一旦您偏移消息,它将指向队列中的下一条消息。如果要跳过提交,则违反了kafka规则。
  • 如果发生错误,取决于您是否要继续
  • @cricket_007 ;请分享任何示例代码
  • @sai pradeep kumar kotha 我不想将错误记录提交给 Kafka。

标签: scala apache-spark apache-kafka rdd offset


【解决方案1】:

所以这里有一个你可以做的小玩具示例

val ds = spark.createDataset(1 to 1000)
import scala.util.{Try, Success, Failure}
//simulates intermittent failure
def writeToCassandra(i:Int):Either[List[String], Unit] = if(i % 11 == 0) Left(List(s"fail $i")) else Right(())
//in stream rdd
//read offset
val succeded = ds.rdd.map(writeToCassandra).reduce {
  case (Right(_), Right(_)) => Right(())
  case (Right(_), Left(err)) => Left(err)
  case (Left(err), Right(_)) => Left(err)
  case (Left(err1), Left(err2)) => Left(err1 ::: err2)
}

succeded match {
  case Right(_) => //commit offsets
  case Left(errList) => // log errors and clean up messages by e.g. writing them to an err topic and then commiting offsets or whatever you want
}

由于网络 IO 减少,请注意您在错误类型(Ether 的左侧)中输入的内容。此外,这不会在第一次失败时中止,而是尽最大努力编写(尽可能写,但要注意失败的写入)

编辑: 稍微想了想,我可能会这样做

val succeded = ds.rdd.mapPartitions(
    partition => {
      List(
        partition.foldLeft[Either[List[String],Unit]](Right(()))(
          (either, entry) => either match {
            case Right(()) => writeToCassandra(entry)
            case Left(errs) => Left(errs)
          }
        )
      ).iterator
    }
  )

val overall = succeded.reduce {
  case (Right(_), Right(_)) => Right(())
  case (Right(_), Left(err)) => Left(err)
  case (Left(err), Right(_)) => Left(err)
  case (Left(err1), Left(err2)) => Left(err1 ::: err2)
}

println(overall)

这应该只收集每个分区的第一个错误,并且只要它没有失败,就可以逐个记录地写入分区记录。如果遇到第一个错误,可能会更慢,但不会炸毁错误堆栈并停止写入。

如果您不关心这一点,您还可以限制组合器/减少中的错误消息数量

【讨论】:

    猜你喜欢
    • 2018-09-22
    • 2021-01-15
    • 2017-07-17
    • 2017-02-06
    • 2019-10-03
    • 2018-04-12
    • 2021-05-22
    • 2021-03-05
    • 2019-07-30
    相关资源
    最近更新 更多