【问题标题】:Flatten nested tuples in RDD展平 RDD 中的嵌套元组
【发布时间】:2018-11-09 17:04:23
【问题描述】:

我正在使用 Spark SQL 从表中提取行。其中一些数据是重复出现的,我正在尝试计算出现的次数。本质上,我正在尝试执行基本的“字数统计”示例,但我的数据不是以下形式:(Word : String, Count : Int),而是有一行数据替换了单词/字符串。

更具体地说,我的数据看起来像:RDD[((row), count)],其中行是从 sql 表中提取的,包含字符串、双精度数、整数等。

它是RDD 形式,因为我想使用reduceByKey。请参阅:Avoid groupByKey。它是一个 (Key, Value) 对,有一个很长的键(来自 sql 数据库的某行),其值为“字数”。

我的应用正在这样做:

myDataframe
    // Append a 1 to each row
    .map(row => (row, 1))
    // Convert to RDD so we can use the reduceByKey method
    .rdd
    // Add up the 1's corresponding to matching keys
    .reduceByKey(_ + _)
    //Filter by rows that show up more than 10 times
    .filter(_._2 > 100)

    ...

现在假设我的行数据包含(string, double, int)。 这是我想将我的数据从RDD[((string, double, int), count)] 解包到RDD[(string, double, int, count)] 的地方,以便我最终可以将这些数据保存到另一个 SQL 表中。

有没有什么方法可以让我解压这个...嵌套元组...之类的内容?

我的解决方案是像这样“解包” RDD 的元素: .map(row => (row._1._1, row._1._2, row._1._3, row._2))

但一定有更好的方法!如果我决定从行中获取更多元素,则必须修改此 .map() 调用。

谢谢!

【问题讨论】:

    标签: sql scala apache-spark rdd flatten


    【解决方案1】:

    您可以使用RowtoSeqfromSeq,如下例所示:

    val df = Seq(
      ("a", 10.0, 1),
      ("a", 10.0, 1),
      ("b", 20.0, 2),
      ("c", 30.0, 3),
      ("c", 30.0, 3)
    ).toDF("c1", "c2", "c3")
    
    import org.apache.spark.sql.Row
    
    df.rdd.
      map((_, 1)).
      reduceByKey(_ + _).
      filter(_._2 > 1).
      map{
        case (row: Row, count: Int) => Row.fromSeq(row.toSeq :+ count)
      }
    // res1: Array[org.apache.spark.sql.Row] = Array([a,10.0,1,2], [c,30.0,3,2])
    

    【讨论】:

    • 感谢您的建议!我遇到了一个有点类似的建议,但发现创建一个序列很有趣……只是在下一步中从该序列中提取内容。看起来更像是一个补丁而不是一个解决方案。如果我的列发生变化,我也不想重写代码“例如:_._2 到 _._3”。
    • Row 就像Tuple 一样,其中元素可以是各种类型,向Tuple 添加(或删除)元素并不像对Seq 那样简单。这就是为什么Row 配备了to/fromSeq 方法来解决这种需求。在这个用例中,当发生转换时,它们将采用任何行,因此如果行已更改,则无需更改代码。这些方法也常用于其他用例(例如数据框中的creating contiguous indexes)。
    【解决方案2】:

    您不必重新使用 RDD;您正确引用的文章警告不要使用RDD.groupByKey,但它不应该应用于DataFrame 的groupBy。在 DataFrame 上使用 groupBy 是安全的(并且性能良好)!查看更多here

    因此,要按所有 DataFrame 的列进行分组,计算每个组的出现次数,并过滤计数 > 10 的组,您可以简单地使用:

    df.groupBy(df.columns.map(col): _*) // alternatively: df.groupBy(df.columns.head, df.columns.tail: _*)
      .count()
      .filter($"count" > 10)
    

    结果具有类似于输入的架构,并带有额外的count 长列。

    【讨论】:

    • 感谢您指出这一点!关于性能不同的帖子阅读起来非常有趣。我看到在您的示例中,您计算​​了组中元素的数量,并过滤了“流行”元素,但我不确定您为什么要在“col”上制作地图。我不认为这是列名的占位符......但我遇到了语法错误。
    • 有两个 groupBy 方法:一个需要 cols: Column* 类型的参数 - 表示一系列 Column 对象,这是我正在使用的对象,另一个需要 两个参数:(col1: String, cols: String*) - 表示第一列的名称,然后是其余名称;您可以使用其中任何一个,但我发现前者更方便,因为它不需要单独传递第一列;如果这些签名不匹配,只需调用 groupBy(df.columns: _*) (常见错误 - 请参阅 select 的类似问题:stackoverflow.com/a/36131805/5344058)。
    • ... 原来我只需要import org.apache.spark.sql.functions._ 更有意义;我认为“col”作为占位符有一些巫术。原来这只是一个很好的 ole' 功能。感谢您的澄清和进一步阅读!
    猜你喜欢
    • 2019-03-05
    • 2020-10-04
    • 2017-10-10
    • 2012-11-21
    • 1970-01-01
    • 2016-06-24
    • 1970-01-01
    • 1970-01-01
    • 2023-04-03
    相关资源
    最近更新 更多