【问题标题】:Type Mismatch for Reduce减少的类型不匹配
【发布时间】:2015-09-15 17:40:00
【问题描述】:

我有一个 RDD:

JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedRdd = dataRDD
    .values().map(mapFunc);

我想在它上面运行一个reduce函数:

private static Function2<Tuple2<Tuple2<String, Long>, Long>, Tuple2<Tuple2<String, Long>, Long>, Tuple2<Tuple2<String, Long>, Long>> redFunc2 = new Function2<Tuple2<Tuple2<String, Long>, Long>, Tuple2<Tuple2<String, Long>, Long>, Tuple2<Tuple2<String, Long>, Long>>() {

@Override
public Tuple2<String, MetricDatum> call(
  Tuple2<Tuple2<String, Long>, Long> v1,
  Tuple2<Tuple2<String, Long>, Long> v2) throws Exception {
  long sum = 0L; // sum up the values
  sum += v1._2();
  sum += v2._2();

  String dimension = v1._1()._1();
  long timestamp = v1._1()._2();

  MetricDatum metricDatum = new MetricDatum();
  metricDatum.setMetricDimension(dimension);
  metricDatum.setTimestamp(timestamp);

  String key = metricDatum.getMetricDimension().toString();
  key += "_" + Long.toString(timestamp);
  metricDatum.setMetric(sum);
  return new Tuple2<>(key, metricDatum);
}

};

但是它给出了错误:

JavaRDD<Tuple2<Tuple2<String, Long>, Long>>  reducedGoraRdd = mappedRdd.reduce(redFunc);

我想用 Spark LogAnalytics.java做这个例子

我错过了什么,我应该使用 flatMap 等还是 reduce 功能完全错误?

【问题讨论】:

  • Reduce 函数应该返回与 rdd 的元素相同的类型。此外,它必须是可交换的和关联的。
  • 我需要返回不同于 rdd 类型的东西 (Tuple2) 那么,我应该使用什么?

标签: apache-spark reduce


【解决方案1】:

基于LogAnalytics.java的reduce函数,我这样写:

//dummy 
class MetricDatum {
    public void setMetricDimension(String l) {}
    public void setTimestamp(Long l) {}
    public void setMetric(Long l) {}
    public Object getMetricDimension() {return new Object();}
}
//fake input
JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedRdd = sc.emptyRDD();

//creating JavaPairRDD from JavaRDD of pairs
JavaPairRDD.fromJavaRDD(mappedRdd)
//reduce with commutative, associative function (Long, Long) -> Long
.reduceByKey(new Function2<Long, Long, Long>() {
    @Override
    public Long call(Long aLong, Long aLong2) throws Exception {
        return aLong + aLong2;
    }
})
//map (key, sum) pairs to (newKey, metricDatum(sum)) and creatring JavaPairRDD
.mapToPair(new PairFunction<Tuple2<Tuple2<String,Long>,Long>, String, MetricDatum>() {
    @Override
    public Tuple2<String, MetricDatum> 
            call(Tuple2<Tuple2<String, Long>, Long> tuple2LongTuple2) throws Exception {
        String dimension = tuple2LongTuple2._1()._1();
        long timestamp = tuple2LongTuple2._1()._2();

        MetricDatum metricDatum = new MetricDatum();
        metricDatum.setMetricDimension(dimension);
        metricDatum.setTimestamp(timestamp);

        String key = metricDatum.getMetricDimension().toString();
        key += "_" + Long.toString(timestamp);
        metricDatum.setMetric(tuple2LongTuple2._2());
        return new Tuple2<String, MetricDatum>(key, metricDatum);
    }
});

【讨论】:

    猜你喜欢
    • 2018-01-21
    • 2021-10-11
    • 2021-07-17
    • 1970-01-01
    • 1970-01-01
    • 2017-12-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多