【发布时间】:2015-09-15 17:40:00
【问题描述】:
我有一个 RDD:
JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedRdd = dataRDD
.values().map(mapFunc);
我想在它上面运行一个reduce函数:
private static Function2<Tuple2<Tuple2<String, Long>, Long>, Tuple2<Tuple2<String, Long>, Long>, Tuple2<Tuple2<String, Long>, Long>> redFunc2 = new Function2<Tuple2<Tuple2<String, Long>, Long>, Tuple2<Tuple2<String, Long>, Long>, Tuple2<Tuple2<String, Long>, Long>>() {
@Override
public Tuple2<String, MetricDatum> call(
Tuple2<Tuple2<String, Long>, Long> v1,
Tuple2<Tuple2<String, Long>, Long> v2) throws Exception {
long sum = 0L; // sum up the values
sum += v1._2();
sum += v2._2();
String dimension = v1._1()._1();
long timestamp = v1._1()._2();
MetricDatum metricDatum = new MetricDatum();
metricDatum.setMetricDimension(dimension);
metricDatum.setTimestamp(timestamp);
String key = metricDatum.getMetricDimension().toString();
key += "_" + Long.toString(timestamp);
metricDatum.setMetric(sum);
return new Tuple2<>(key, metricDatum);
}
};
但是它给出了错误:
JavaRDD<Tuple2<Tuple2<String, Long>, Long>> reducedGoraRdd = mappedRdd.reduce(redFunc);
我想用 Spark LogAnalytics.java做这个例子
我错过了什么,我应该使用 flatMap 等还是 reduce 功能完全错误?
【问题讨论】:
-
Reduce 函数应该返回与 rdd 的元素相同的类型。此外,它必须是可交换的和关联的。
-
我需要返回不同于 rdd 类型的东西 (Tuple2
) 那么,我应该使用什么?
标签: apache-spark reduce