【问题标题】:How to name aggregate columns?如何命名聚合列?
【发布时间】:2016-11-29 06:50:12
【问题描述】:

我在 Scala 中使用 Spark,我的聚合列是匿名的。有没有一种方便的方法来重命名数据集中的多个列?我想过用as 强加一个模式,但关键列是一个结构(由于groupBy 操作),我不知道如何定义一个case classStructType

我尝试如下定义架构:

val returnSchema = StructType(StructField("edge", StructType(StructField("src", IntegerType, true),
                                                             StructField("dst", IntegerType), true)), 
                              StructField("count", LongType, true))
edge_count.as[returnSchema]

但我得到一个编译错误:

Message: <console>:74: error: overloaded method value apply with alternatives:
  (fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
 cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, Boolean)
       val returnSchema = StructType(StructField("edge", StructType(StructField("src", IntegerType, true),

【问题讨论】:

  • 你能给我们看看代码吗?那么也许我可以制定一个更好的方法?
  • 假设您有一个包含三列的数据集。按前两个分组,按第三个计数。关键是一个元组。我在 Spark 1.6.2 上。谢谢@AlbertoBonsanto!

标签: scala apache-spark apache-spark-dataset


【解决方案1】:

最好的解决方案是明确命名您的列,例如,

df
  .groupBy('a, 'b)
  .agg(
    expr("count(*) as cnt"),
    expr("sum(x) as x"),
    expr("sum(y)").as("y")
  )

如果您使用的是数据集,则必须提供列的类型,例如,expr("count(*) as cnt").as[Long]

您可以直接使用 DSL,但我经常发现它比简单的 SQL 表达式更冗长。

如果您想进行批量重命名,请使用 Map,然后使用 foldLeft 数据框。

【讨论】:

  • 这给了我一个类型不匹配的错误;输入是一个数据集。
  • 这是因为 expr() 返回 Column 并且您需要在数据集 API 中使用 TypedColumn。我已更新答案以显示数据集示例。
【解决方案2】:

我最终将aliases 与select 语句一起使用;例如,

ds.select($"key.src".as[Short], 
          $"key.dst".as[Short], 
          $"sum(count)".alias("count").as[Long])

首先我必须使用printSchema 来确定派生列名:

> ds.printSchema

root
 |-- key: struct (nullable = false)
 |    |-- src: short (nullable = false)
 |    |-- dst: short (nullable = false)
 |-- sum(count): long (nullable = true)

【讨论】:

    【解决方案3】:

    我同意 Sim 的回答,即最方便的方法是在创建时明确命名列。这只是另一种为列命名的方法(不使用expr):

    import org.apache.spark.sql.functions._
    import spark.implicits._
    
    val df = Seq(
      (1, "a"),
      (2, "b"),
      (3, "c")
    ).toDF("number", "word")
    
    val aggDf = df.agg(
      collect_list(struct(col("number"), col("word"))) as "myStruct",
      sum(col("number")) as "mySum",
      count(col("*")) as "myCnt"
    )
    
    aggDf.printSchema
    
    // |-- myStruct: array (nullable = true)
    // |    |-- element: struct (containsNull = true)
    // |    |    |-- number: integer (nullable = false)
    // |    |    |-- word: string (nullable = true)
    // |-- mySum: long (nullable = true)
    // |-- myCnt: long (nullable = false)
    
    aggDf.show()
    
    // +------------------------+-----+-----+
    // |myStruct                |mySum|myCnt|
    // +------------------------+-----+-----+
    // |[[1, a], [2, b], [3, c]]|6    |3    |
    // +------------------------+-----+-----+
    

    【讨论】:

      猜你喜欢
      • 2021-12-24
      • 1970-01-01
      • 2015-07-11
      • 2019-11-29
      • 2013-02-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多