【问题标题】:Convert class object to Java RDD将类对象转换为 Java RDD
【发布时间】:2017-01-16 01:40:40
【问题描述】:

我有一个 pojo-AnalyticsModel,我有一个循环,我在其中为 pojo 创建对象。每次创建对象时,我都会保存到名为 AnalyticsModelList 的列表中。然后在退出循环后,我将列表转换为 RDD。下面是我的代码:

do {    
       AnalyticsModel analyticsObj = new AnalyticsModel(time, columnName, aggrResult);
       analyticsList.add(analyticsObj);
    }while (cal.getTimeInMillis() <= endDate);

    JavaRDD<AnalyticsModel> analyticsRdd = sc.parallelize(analyticsList);

我可以直接将对象作为 RDD,而不是添加到列表然后将其转换为 RDD。有人可以指导我吗?我对 Spark 很陌生。

谢谢

【问题讨论】:

  • 它看起来不错,这是一个更好的方法,只需使用Kyro serializer,工作会更快

标签: java apache-spark rdd


【解决方案1】:

有办法。但是如果您的AnalyticsModel 对象很少,您正在做的事情就很好(您不必通过以下解决方案使您的生活变得复杂)。 如果您要创建大量 AnalyticsModel 对象,请考虑以下解决方案。

此类每次调用都会生成一个AnalyticsModel 实例。

public class AnalyticsModelGenerator implements FlatMapFunction<String, AnalyticsModel> {

    private final Long endDate;
    private final Calendar cal;

    public AnalyticsModelGenerator(Calendar cal, Long endDate) {
        this.cal = cal;
        this.endDate = endDate;
    }
    @Override
    public Iterable<AnalyticsModel> call(String dummyInput) throws Exception {
        return new Iterable<AnalyticsModel>() {
            @Override
            public Iterator<AnalyticsModel> iterator() {
                return new AMIterator(cal, endDate);
            }
        }
    }

    private class AMIterator implements Iterator<AnalyticsModel> {
        private final Long endDate;
        private final Calendar cal;

        public AMIterator(Calendar cal, Long endDate) {
            this.cal = cal;
            this.endDate = endDate;
        }

        @Override
        public boolean hasNext() {
            return (cal.getTimeInMillis() <= endDate);
        }

        @Override
        public AnalyticsModel next() {
            return new AnalyticsModel(time, columnName, aggrResult);
        }
    }
}

下面的代码创建了一个AnalyticsModel对象的RDD:

JavaRDD<String> initJavaRDD = jSparkContext.parallelize(new ArrayList<String>(){{add("dummyInput");}});
JavaRDD<AnalyticsModel> amJavaRDD = initJavaRDD.flatMap(new AnalyticsModelGenerator(cal, endDate));

【讨论】:

  • 对不起,我不能完全按照下面的代码:JavaRDD initJavaRDD = jSparkContext.parallelize(new ArrayList(){{add("dummyInput");} });我想我想出了一个更简单的解决方案,我已经在下面发布了
  • 它使用一个不会在任何地方使用的虚拟字符串创建一个新的 RDD,然后 flatMap 根据您的需要创建尽可能多的AnalyticsModel 对象。
  • 感谢您的澄清。这个解决方案看起来不错,但不幸的是我不能在我的代码中使用它,因为我使用的是 Spark 1.6,其中调用函数返回 Iterable 而不是 Iterator。
  • 感谢更新代码。这对我来说很好:)
猜你喜欢
  • 2015-08-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-01-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多