【问题标题】:Spark: Serialization not working with AggregateSpark:序列化不适用于聚合
【发布时间】:2016-07-06 08:14:40
【问题描述】:

我有这个类(Java 中),我想在 Spark (1.6) 中使用它:

public class Aggregation {
  private Map<String, Integer> counts;

  public Aggregation() {
    counts = new HashMap<String, Integer>();
  }

  public Aggregation add(Aggregation ia) {
    String key = buildCountString(ia);
    addKey(key);
    return this;
  }

  private void addKey(String key, int cnt) {
    if(counts.containsKey(key)) {
        counts.put(key, counts.get(key) + cnt);
    }
    else {
        counts.put(key, cnt);
    }
  }

  private void addKey(String key) {
    addKey(key, 1);
  }

  public Aggregation merge(Aggregation agg) {
    for(Entry<String, Integer> e: agg.counts.entrySet()) {
        this.addKey(e.getKey(), e.getValue());
    }
    return this;
  }

  private String buildCountString(Aggregation rec) {
    ...
  }
}

在启动 Spark 时,我启用了 Kyro 并添加了这个类(在 Scala 中):

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[Aggregation]))

我想像这样 (Scala) 将它与 Spark 聚合一起使用:

rdd.aggregate(new InteractionAggregation)((agg, rec) => agg.add(rec), (a, b) => a.merge(b) )

不知何故,这引发了“任务不可序列化”异常。

但是当我使用带有 map 和 reduce 的类时,一切正常:

val rdd2= interactionObjects.map( _ => new InteractionAggregation())
rdd2.reduce((a,b) => a.merge(b))
println(rdd2.count())

您知道为什么聚合会出现错误,而 map/reduce 不会出现错误吗?

感谢和问候!

【问题讨论】:

    标签: java serialization apache-spark


    【解决方案1】:

    您的聚合类应该实现可序列化。当您调用聚合时,驱动程序会将您的 (new Aggregation()) 对象发送给所有工作人员,这会导致序列化错误。

    【讨论】:

    • 这确实解决了这个问题。但是我认为在使用 Kyro 时没有必要实现 Serializable !?还是 Spark 在使用聚合时会回退到 Java 序列化?
    • 如果没有记错的话,Kryo 只处理一些核心 scala 不可序列化的类,但不是所有的类。见github.com/EsotericSoftware/…
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-26
    • 1970-01-01
    • 1970-01-01
    • 2012-12-27
    • 2016-02-26
    • 2017-08-03
    相关资源
    最近更新 更多