【发布时间】:2018-08-30 01:16:10
【问题描述】:
我是 Sacla 的新手。我想在从 Kafka 读取消息并写入 Cassandra DB 时处理流偏移事务。每次写入后,我都会向 Kafka 提交偏移量。如果 DB 写入时出现任何错误,我需要跳过 Kafka 偏移写入。
DB 出错时如何跳过 Kafka Offset 写入?
代码
kafkaStream.foreach(rdd=> {
rdd.foreachRDD(conRec=> {
val offsetRanges = conRec.asInstanceOf[HasOffsetRanges].offsetRanges
conRec.foreach(str=> {
try {
CassandraHelper.saveItemEvent(str.value())
}catch {
case ex: Exception => {
println(ex.getMessage)
}
}
})
rdd.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
})
【问题讨论】:
-
建议:Kafka Connect 有一个 Cassandra 连接器,可以在出错时为您保存偏移量
-
如果保存时发生任何错误,那么您的场景是什么?你想继续流式传输吗?如果是这样,您误解了 kakfa 的用途。错误消息的计划是什么?卡夫卡只是排队。使用您的消费者读取的组 ID,一旦您偏移消息,它将指向队列中的下一条消息。如果要跳过提交,则违反了kafka规则。
-
如果发生错误,取决于您是否要继续
-
@cricket_007 ;请分享任何示例代码
-
@sai pradeep kumar kotha 我不想将错误记录提交给 Kafka。
标签: scala apache-spark apache-kafka rdd offset