【问题标题】:Spark error handling - Inside Dataset map operationSpark 错误处理 - 内部数据集映射操作
【发布时间】:2021-09-01 07:28:39
【问题描述】:

我正在尝试收集在映射操作期间发生的所有错误,然后将错误写入每个项目的单独文件中。

以下是示例代码。 (这不是确切的代码。字符串列表将被自定义类型替换,并将保存地图迭代中使用的一些值,例如。实际数据集可能有一个 id、序列号和字符串数据,所以如果发生错误,我应该有信息错误集合中的 id 和序列号以及错误消息。)

List<String> errorList = new ArrayList<>();

Dataset<Integer> ds = spark.createDataset(Arrays.asList(1111, 0, 31111, 0, 51111, 61111, 71111, 0),
        Encoders.INT());

Dataset<Integer> result = ds.map((MapFunction<Integer, Integer>) value -> {
    try {
        System.out.print(value);
        if (value / 0 == 0)
            System.out.println("sucess");

    } catch (Exception ex) {
        ex.printStackTrace();
        errorList.add("error occured for - " + value);
    }
    return value;
}, Encoders.INT());

errorList.stream().forEach(System.out::println);

我没有看到 errorlist 的输出(在本地模式下从 eclipse IDE 运行这个 spark)。 是不是有什么错误或者这种全局变量的概念是不可能的?

【问题讨论】:

    标签: java apache-spark hadoop apache-spark-sql


    【解决方案1】:

    您在驱动程序上初始化列表,然后map 在执行程序上运行。 errorList 将被序列化,发送到执行程序并反序列化,因此错误将添加到 errorList 的副本中。 然后您尝试在显然为空的驱动程序上打印 errorList 的值。

    如果您想收集有关驱动程序的错误,您可以在map 中返回错误。

    更多关于火花序列化https://stackoverflow.com/a/40818002/9630486.

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-09-29
      • 2018-10-07
      • 2019-09-07
      • 1970-01-01
      • 2021-03-14
      • 1970-01-01
      • 1970-01-01
      • 2022-09-23
      相关资源
      最近更新 更多