【发布时间】:2019-01-08 08:55:15
【问题描述】:
我希望有人可以帮助我解决 Spark 问题。
我在一个数据集 (ds_ids) 中有一堆 ID,假设我的数据集如下所示:
ID
1
2
3
我正在为这些 Id 收集更多数据并将它们存储在一个新的数据集 (ds_combined_data) 中,例如:
ID | Date | Status
1 | 10.01.18 | 10
1 | 11.01.18 | 20
2 | 10.01.18 | 20
3 | 12.01.18 | 20
3 | 13.01.18 | 30
我现在要做的是通过一个名为processMethod的方法(并行)处理这个Dataset的子集(按ID),例如:
ID | Date | Status
1 | 10.01.18 | 10
1 | 11.01.18 | 20
因此,我尝试了以下方法:
ds_ids.foreach((ForeachFunction<Long>) row -> {
this.processMethod(ds_combined_data.where(col("ID").equalTo(row.longValue())));
});
我收到以下错误:
User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 56.0 failed 4 times, most recent failure: Lost task 1.3 in stage 56.0 (TID 2651, local, executor 2)
: java.lang.NullPointerException at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:58)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2827)
at org.apache.spark.sql.Dataset.filter(Dataset.scala:1272)
at org.apache.spark.sql.Dataset.where(Dataset.scala:1300)
at SVDatenTransferJob.lambda$process$4a2a96b4$1(SVDatenTransferJob.java:184)
似乎无法从 foreach 函数内部访问 ds_combined_data。
所以我做了一些研究,似乎无法从 foreach 函数内部访问外部对象。
有人知道如何解决这个问题吗?有没有办法将变量/对象 ds_combined_data 传递给函数?
【问题讨论】:
标签: java apache-spark foreach dataset rdd