【发布时间】:2017-12-19 09:57:01
【问题描述】:
我们有一个 spark 流程序,它从 kafka 中提取消息并使用forEachPartiton 转换处理每个单独的消息。
如果处理函数中存在特定错误,我们希望抛出异常并停止程序。同样的事情似乎没有发生。下面是我们尝试执行的代码。
JavaInputDStream<KafkaDTO> stream = KafkaUtils.createDirectStream( ...);
stream.foreachRDD(new Function<JavaRDD<KafkaDTO>, Void>() {
public Void call(JavaRDD<KafkaDTO> rdd) throws PropertiesLoadException, Exception {
rdd.foreachPartition(new VoidFunction<Iterator<KafkaDTO>>() {
@Override
public void call(Iterator<KafkaDTO> itr) throws PropertiesLoadException, Exception {
while (itr.hasNext()) {
KafkaDTO dto = itr.next();
try{
//process the message here.
} catch (PropertiesLoadException e) {
// throw Exception if property file is not found
throw new PropertiesLoadException(" PropertiesLoadException: "+e.getMessage());
} catch (Exception e) {
throw new Exception(" Exception : "+e.getMessage());
}
}
}
});
}
}
在上面的代码中,即使我们抛出PropertiesLoadException,程序也不会停止,流式传输会继续。我们在 Spark 配置中设置的最大重试次数仅为 4。流式传输程序即使在 4 次失败后仍会继续。应该如何抛出异常来停止程序?
【问题讨论】:
标签: java apache-spark spark-streaming