【发布时间】:2016-07-06 08:14:40
【问题描述】:
我有这个类(Java 中),我想在 Spark (1.6) 中使用它:
public class Aggregation {
private Map<String, Integer> counts;
public Aggregation() {
counts = new HashMap<String, Integer>();
}
public Aggregation add(Aggregation ia) {
String key = buildCountString(ia);
addKey(key);
return this;
}
private void addKey(String key, int cnt) {
if(counts.containsKey(key)) {
counts.put(key, counts.get(key) + cnt);
}
else {
counts.put(key, cnt);
}
}
private void addKey(String key) {
addKey(key, 1);
}
public Aggregation merge(Aggregation agg) {
for(Entry<String, Integer> e: agg.counts.entrySet()) {
this.addKey(e.getKey(), e.getValue());
}
return this;
}
private String buildCountString(Aggregation rec) {
...
}
}
在启动 Spark 时,我启用了 Kyro 并添加了这个类(在 Scala 中):
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[Aggregation]))
我想像这样 (Scala) 将它与 Spark 聚合一起使用:
rdd.aggregate(new InteractionAggregation)((agg, rec) => agg.add(rec), (a, b) => a.merge(b) )
不知何故,这引发了“任务不可序列化”异常。
但是当我使用带有 map 和 reduce 的类时,一切正常:
val rdd2= interactionObjects.map( _ => new InteractionAggregation())
rdd2.reduce((a,b) => a.merge(b))
println(rdd2.count())
您知道为什么聚合会出现错误,而 map/reduce 不会出现错误吗?
感谢和问候!
【问题讨论】:
标签: java serialization apache-spark