【问题标题】:Spark: Iterate Subsets of Datasets by key (where clause)Spark:按键迭代数据集的子集(where子句)
【发布时间】:2019-01-08 08:55:15
【问题描述】:

我希望有人可以帮助我解决 Spark 问题。

我在一个数据集 (ds_ids) 中有一堆 ID,假设我的数据集如下所示:

ID
1
2
3

我正在为这些 Id 收集更多数据并将它们存储在一个新的数据集 (ds_combined_data) 中,例如:

ID       |      Date      |        Status
1        |   10.01.18     |         10
1        |   11.01.18     |         20
2        |   10.01.18     |         20
3        |   12.01.18     |         20
3        |   13.01.18     |         30

我现在要做的是通过一个名为processMethod的方法(并行)处理这个Dataset的子集(按ID),例如:

ID       |      Date      |        Status
1        |   10.01.18     |         10
1        |   11.01.18     |         20

因此,我尝试了以下方法:

ds_ids.foreach((ForeachFunction<Long>) row -> {
                    this.processMethod(ds_combined_data.where(col("ID").equalTo(row.longValue())));
                });

我收到以下错误:

User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 56.0 failed 4 times, most recent failure: Lost task 1.3 in stage 56.0 (TID 2651, local, executor 2) 
: java.lang.NullPointerException at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167) 
 at org.apache.spark.sql.Dataset$.apply(Dataset.scala:58) 
 at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2827) 
 at org.apache.spark.sql.Dataset.filter(Dataset.scala:1272) 
 at org.apache.spark.sql.Dataset.where(Dataset.scala:1300) 
 at SVDatenTransferJob.lambda$process$4a2a96b4$1(SVDatenTransferJob.java:184) 

似乎无法从 foreach 函数内部访问 ds_combined_data。

所以我做了一些研究,似乎无法从 foreach 函数内部访问外部对象。

有人知道如何解决这个问题吗?有没有办法将变量/对象 ds_combined_data 传递给函数?

【问题讨论】:

    标签: java apache-spark foreach dataset rdd


    【解决方案1】:

    我现在要做的是(并行)处理此数据集的子集(按 ID)

    如果我理解得很好,您希望对数据集进行分区,以便每个分区包含具有相同 ID 的所有元素。在 Spark 中,您可以使用 repartitionmapPartition 来实现。

        ds.repartition(ds.col("ID")).mapPartitions(new MapPartitionsFunction() {
            @Override
            public Iterator call(Iterator iterator) throws Exception {
                // you can call processMethod passing iterator
                // Example of processing each Row of the iterator:
                while(iterator.hasNext()) {
                    Row r = (Row) iterator.next();
                    Integer id = r.getInt(0);
                }
                return iterator;
            }
        }, Encoders.kryo(Row.class)).show();
    

    使用repartition(ds.col("ID")),您将强制 Spark 重新组织您的数据集,以便您拥有一个按 ID 划分的分区。然后,使用mapPartition,您可以将每个分区作为行的迭代器进行处理。最后,您必须返回另一个迭代器。请注意,您必须为要保存在迭代器中的对象类型指定编码器。您可以返回 Rows、Long 等基本类型或您自己创建的 POJO 类型。

    希望对您有所帮助!

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-03-03
      • 1970-01-01
      • 2017-11-12
      • 1970-01-01
      • 2017-02-17
      • 1970-01-01
      相关资源
      最近更新 更多