【问题标题】:How to throw exception in spark streaming如何在火花流中抛出异常
【发布时间】:2017-12-19 09:57:01
【问题描述】:

我们有一个 spark 流程序,它从 kafka 中提取消息并使用forEachPartiton 转换处理每个单独的消息。

如果处理函数中存在特定错误,我们希望抛出异常并停止程序。同样的事情似乎没有发生。下面是我们尝试执行的代码。

JavaInputDStream<KafkaDTO> stream = KafkaUtils.createDirectStream( ...);

stream.foreachRDD(new Function<JavaRDD<KafkaDTO>, Void>() {

    public Void call(JavaRDD<KafkaDTO> rdd) throws PropertiesLoadException, Exception {    
         rdd.foreachPartition(new VoidFunction<Iterator<KafkaDTO>>() {

             @Override
             public void call(Iterator<KafkaDTO> itr) throws PropertiesLoadException, Exception {
                 while (itr.hasNext()) {
                     KafkaDTO dto = itr.next();
                     try{
                       //process the message here.
                     } catch (PropertiesLoadException e) {
                         // throw Exception if property file is not found
                         throw new PropertiesLoadException(" PropertiesLoadException: "+e.getMessage());
                     } catch (Exception e) {
                         throw new Exception(" Exception : "+e.getMessage());
                     }
                 }
             }
         });
     }
 }

在上面的代码中,即使我们抛出PropertiesLoadException,程序也不会停止,流式传输会继续。我们在 Spark 配置中设置的最大重试次数仅为 4。流式传输程序即使在 4 次失败后仍会继续。应该如何抛出异常来停止程序?

【问题讨论】:

    标签: java apache-spark spark-streaming


    【解决方案1】:

    我不确定这是否是最好的方法,但我们用 try 和 catch 包围了主批处理,当我遇到异常时,我只是调用 close context。此外,您需要确保停止优雅地关闭 (false)。

    示例代码:

    try {
        process(dataframe);
    } catch (Exception e) {
        logger.error("Failed on write - will stop spark context immediately!!" + e.getMessage());
        closeContext(jssc);
        if (e instanceof InterruptedException) {
            Thread.currentThread().interrupt();
        }
        throw e;
    }
    

    并关闭功能:

    private void closeContext(JavaStreamingContext jssc) {
        logger.warn("stopping the context");
        jssc.stop(false, jssc.sparkContext().getConf().getBoolean("spark.streaming.stopGracefullyOnShutdown", false));
        logger.error("Context was stopped");
    }
    

    在配置中:

    spark.streaming.stopGracefullyOnShutdown false

    我认为你的代码应该是这样的:

    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, streamBatch);
    JavaInputDStream<KafkaDTO> stream = KafkaUtils.createDirectStream( jssc, ...);
    
        stream.foreachRDD(new Function<JavaRDD<KafkaDTO>, Void>() {
    
            public Void call(JavaRDD<KafkaDTO> rdd) throws PropertiesLoadException, Exception {
    
                try {
    
                    rdd.foreachPartition(new VoidFunction<Iterator<KafkaDTO>>() {
    
                        @Override
                        public void call(Iterator<KafkaDTO> itr) throws PropertiesLoadException, Exception {
                            while (itr.hasNext()) {
                                KafkaDTO dto = itr.next();
                                try {
                                    //process the message here.
                                } catch (PropertiesLoadException e) {
                                    // throw Exception if property file is not found
                                    throw new PropertiesLoadException(" PropertiesLoadException: " + e.getMessage());
                                } catch (Exception e) {
                                    throw new Exception(" Exception : " + e.getMessage());
                                }
                            }
                        }
                    });
    
                } catch (Exception e){
                    logger.error("Failed on write - will stop spark context immediately!!" + e.getMessage());
                    closeContext(jssc);
                    if (e instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    throw e;
                }
    
            }
        }
    

    另外请注意,我的流在 spark 2.1 Standalone(不是 yarn / mesos)客户端模式下工作。此外,我使用 ZK 优雅地实现了停止。

    【讨论】:

    • 感谢您的回复,但如何从内部方法访问“jssc”。 jsss 在驱动程序中,但在执行程序中捕获了异常。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-02
    • 2017-02-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-12-30
    相关资源
    最近更新 更多