【发布时间】:2021-09-01 07:28:39
【问题描述】:
我正在尝试收集在映射操作期间发生的所有错误,然后将错误写入每个项目的单独文件中。
以下是示例代码。 (这不是确切的代码。字符串列表将被自定义类型替换,并将保存地图迭代中使用的一些值,例如。实际数据集可能有一个 id、序列号和字符串数据,所以如果发生错误,我应该有信息错误集合中的 id 和序列号以及错误消息。)
List<String> errorList = new ArrayList<>();
Dataset<Integer> ds = spark.createDataset(Arrays.asList(1111, 0, 31111, 0, 51111, 61111, 71111, 0),
Encoders.INT());
Dataset<Integer> result = ds.map((MapFunction<Integer, Integer>) value -> {
try {
System.out.print(value);
if (value / 0 == 0)
System.out.println("sucess");
} catch (Exception ex) {
ex.printStackTrace();
errorList.add("error occured for - " + value);
}
return value;
}, Encoders.INT());
errorList.stream().forEach(System.out::println);
我没有看到 errorlist 的输出(在本地模式下从 eclipse IDE 运行这个 spark)。 是不是有什么错误或者这种全局变量的概念是不可能的?
【问题讨论】:
标签: java apache-spark hadoop apache-spark-sql