【发布时间】:2020-08-07 17:53:54
【问题描述】:
我正在开发一个 Spark 结构化流作业,该作业从 Kafka 主题读取并写入 Jdbc 数据库。
数据库应该有一个维护窗口,我正试图找出一种在不中止工作的情况下处理该案例的方法。
我的代码:
// read data from kafka and transform into required DF.
val transformDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", config.kafkaBootstrapServers)
.option("startingOffsets", "latest")
.option("subscribePattern", config.topics)
.load()
.transform(toRaw)
//write
val query = transformDF
.writeStream
.option("checkpointLocation", config.checkpointLocation)
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write
.format("jdbc")
.option("url", config.url)
.option("user", config.username)
.option("password", config.password)
.option(JDBCOptions.JDBC_TABLE_NAME, tableName.get)
.option("stringtype", "unspecified")
.mode(SaveMode.Append)
.save()
})
}.outputMode(OutputMode.Append()).start()
try {
query.awaitTermination()
} catch {
case e: Exception => logger.error("Error", e)
}
现在,如果 DB 不可用,代码将进入异常块并中止。我想避免这种情况,相反,我希望停止进一步阅读消息。我试图避免重新提交作业的手动过程。
这可能吗?
火花:2.4.5
【问题讨论】:
-
在这种情况下,我建议尝试提供背压的 Akka Kafka(Alpakka)。这意味着当流的内部缓冲区因接收器不可用而已满时,它可以停止从代理中提取消息的过程。您也可以检查 DStream 的背压控制。
-
@EmiCareOfCell44 Akka Kafka 不适合我们。
-
有趣的是,Spark 官方文档中没有解决这个问题。此外,DEV OPP 的事情意味着优雅的终止。然后重新开始。
-
你试过ForeachWriter接口吗?您可以在其中实现自己的重试机制。
-
@mike 如果数据库在处理期间在那里脱机怎么办?在目标上至少需要幂等的东西?除非我错了
标签: scala apache-spark apache-spark-sql spark-streaming spark-structured-streaming