【问题标题】:Using KeyValueGroupedDataset cogroup in spark在 Spark 中使用 KeyValueGroupedDataset cogroup
【发布时间】:2018-01-17 19:53:30
【问题描述】:

我想在 Spark 中的 KeyValueGroupedDataset 上使用 cogroup 方法。这是一个 scala 尝试,但出现错误:

import org.apache.spark.sql.functions._
val x1 = Seq(("a", 36), ("b", 33), ("c", 40), ("a", 38), ("c", 39)).toDS
val g1 = x1.groupByKey(_._1)
val x2 = Seq(("a", "ali"), ("b", "bob"), ("c", "celine"), ("a", "amin"), ("c", "cecile")).toDS
val g2 = x2.groupByKey(_._1)
val cog = g1.cogroup(g2, (k: Long, iter1:Iterator[(String, Int)], iter2:Iterator[(String, String)]) =>  iter1);

但出现错误:

<console>:34: error: overloaded method value cogroup with alternatives:
  [U, R](other: org.apache.spark.sql.KeyValueGroupedDataset[String,U], f: org.apache.spark.api.java.function.CoGroupFunction[String,(String, Int),U,R], encoder: org.apache.spark.sql.Encoder[R])org.apache.spark.sql.Dataset[R] <and>
  [U, R](other: org.apache.spark.sql.KeyValueGroupedDataset[String,U])(f: (String, Iterator[(String, Int)], Iterator[U]) => TraversableOnce[R])(implicit evidence$11: org.apache.spark.sql.Encoder[R])org.apache.spark.sql.Dataset[R]
 cannot be applied to (org.apache.spark.sql.KeyValueGroupedDataset[String,(String, String)], (Long, Iterator[(String, Int)], Iterator[(String, String)]) => Iterator[(String, Int)])
       val cog = g1.cogroup(g2, (k: Long, iter1:Iterator[(String, Int)], iter2:Iterator[(String, String)]) =>  iter1);

我在 JAVA 中遇到同样的错误。

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    cogroup 您尝试使用的是 curried,因此您必须为数据集和函数分别调用它。键类型也有类型不匹配:

    g1.cogroup(g2)(
      (k: String, it1: Iterator[(String, Int)], it2: Iterator[(String, String)]) => 
        it1)
    

    或者只是:

    g1.cogroup(g2)((_, it1, _) => it1)
    

    在 Java 中,我会使用 CoGroupFunction 变体:

    import org.apache.spark.api.java.function.CoGroupFunction;
    import org.apache.spark.sql.Encoders;
    
    g1.cogroup(
      g2,
      (CoGroupFunction<String, Tuple2<String, Integer>, Tuple2<String, String>, Tuple2<String, Integer>>) (key, it1, it2) -> it1,
      Encoders.tuple(Encoders.STRING(), Encoders.INT()));
    

    其中g1g2 分别是KeyValueGroupedDataset&lt;String, Tuple2&lt;String, Integer&gt;KeyValueGroupedDataset&lt;String, Tuple2&lt;String, String&gt;&gt;

    【讨论】:

      猜你喜欢
      • 2021-11-05
      • 2017-10-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-04-26
      • 1970-01-01
      • 2017-11-08
      • 2017-10-13
      相关资源
      最近更新 更多