【问题标题】:Spark dataframe transform multiple rows to columnSpark数据框将多行转换为列
【发布时间】:2016-02-17 08:56:12
【问题描述】:

我是 spark 新手,我想transform source dataframe(从 JSON 文件加载):

+--+-----+-----+
|A |count|major|
+--+-----+-----+
| a|    1|   m1|
| a|    1|   m2|
| a|    2|   m3|
| a|    3|   m4|
| b|    4|   m1|
| b|    1|   m2|
| b|    2|   m3|
| c|    3|   m1|
| c|    4|   m3|
| c|    5|   m4|
| d|    6|   m1|
| d|    1|   m2|
| d|    2|   m3|
| d|    3|   m4|
| d|    4|   m5|
| e|    4|   m1|
| e|    5|   m2|
| e|    1|   m3|
| e|    1|   m4|
| e|    1|   m5|
+--+-----+-----+

进入下面结果数据框

+--+--+--+--+--+--+
|A |m1|m2|m3|m4|m5|
+--+--+--+--+--+--+
| a| 1| 1| 2| 3| 0|
| b| 4| 2| 1| 0| 0|
| c| 3| 0| 4| 5| 0|
| d| 6| 1| 2| 3| 4|
| e| 4| 5| 1| 1| 1|
+--+--+--+--+--+--+

这是转换规则

  1. 结果数据框由A + (n major columns) 组成,其中major 列名称由以下人员指定:

    sorted(src_df.map(lambda x: x[2]).distinct().collect())
    
  2. 结果数据框包含m 行,其中A 列的值由以下人员提供:

    sorted(src_df.map(lambda x: x[0]).distinct().collect())
    
  3. 结果数据帧中每个主要列的值是来自相应A 和主要的源数据帧的值 (例如,源数据帧中第 1 行的计数映射到 box,其中 Aa 和列 m1

  4. 源数据框中Amajor的组合没有重复(请认为它是SQL中两列的主键)

【问题讨论】:

    标签: python apache-spark dataframe apache-spark-sql rdd


    【解决方案1】:

    使用 zero323 的数据框,

    df = sqlContext.createDataFrame([
    ("a", 1, "m1"), ("a", 1, "m2"), ("a", 2, "m3"),
    ("a", 3, "m4"), ("b", 4, "m1"), ("b", 1, "m2"),
    ("b", 2, "m3"), ("c", 3, "m1"), ("c", 4, "m3"),
    ("c", 5, "m4"), ("d", 6, "m1"), ("d", 1, "m2"),
    ("d", 2, "m3"), ("d", 3, "m4"), ("d", 4, "m5"),
    ("e", 4, "m1"), ("e", 5, "m2"), ("e", 1, "m3"),
    ("e", 1, "m4"), ("e", 1, "m5")], 
    ("a", "cnt", "major"))
    

    你也可以使用

    reshaped_df = df.groupby('a').pivot('major').max('cnt').fillna(0)
    

    【讨论】:

    • 这里的“max”有什么作用?
    • 它处理“cnt”的重复值。例如,如果您同时拥有 ("a", 1, "m1") 和 ("a", 2, "m1"),这将确保您最终会在 "m1" 列中得到值 2。
    • 啊,如果我知道 "cnt" 总是只有一个值,那么我可以使用 select("cnt") 代替吗?
    【解决方案2】:

    让我们从示例数据开始:

    df = sqlContext.createDataFrame([
        ("a", 1, "m1"), ("a", 1, "m2"), ("a", 2, "m3"),
        ("a", 3, "m4"), ("b", 4, "m1"), ("b", 1, "m2"),
        ("b", 2, "m3"), ("c", 3, "m1"), ("c", 4, "m3"),
        ("c", 5, "m4"), ("d", 6, "m1"), ("d", 1, "m2"),
        ("d", 2, "m3"), ("d", 3, "m4"), ("d", 4, "m5"),
        ("e", 4, "m1"), ("e", 5, "m2"), ("e", 1, "m3"),
        ("e", 1, "m4"), ("e", 1, "m5")], 
        ("a", "cnt", "major"))
    

    请注意,我已将 count 更改为 cnt。 Count 是大多数 SQL 方言中的保留关键字,它不是列名的好选择。

    至少有两种方法可以重塑这些数据:

    • 在 DataFrame 上进行聚合

      from pyspark.sql.functions import col, when, max
      
      majors = sorted(df.select("major")
          .distinct()
          .map(lambda row: row[0])
          .collect())
      
      cols = [when(col("major") == m, col("cnt")).otherwise(None).alias(m) 
          for m in  majors]
      maxs = [max(col(m)).alias(m) for m in majors]
      
      reshaped1 = (df
          .select(col("a"), *cols)
          .groupBy("a")
          .agg(*maxs)
          .na.fill(0))
      
      reshaped1.show()
      
      ## +---+---+---+---+---+---+
      ## |  a| m1| m2| m3| m4| m5|
      ## +---+---+---+---+---+---+
      ## |  a|  1|  1|  2|  3|  0|
      ## |  b|  4|  1|  2|  0|  0|
      ## |  c|  3|  0|  4|  5|  0|
      ## |  d|  6|  1|  2|  3|  4|
      ## |  e|  4|  5|  1|  1|  1|
      ## +---+---+---+---+---+---+
      
    • groupBy 超过 RDD

      from pyspark.sql import Row
      
      grouped = (df
          .map(lambda row: (row.a, (row.major, row.cnt)))
          .groupByKey())
      
      def make_row(kv):
          k, vs = kv
          tmp = dict(list(vs) + [("a", k)])
          return Row(**{k: tmp.get(k, 0) for k in ["a"] + majors})
      
      reshaped2 = sqlContext.createDataFrame(grouped.map(make_row))
      
      reshaped2.show()
      
      ## +---+---+---+---+---+---+
      ## |  a| m1| m2| m3| m4| m5|
      ## +---+---+---+---+---+---+
      ## |  a|  1|  1|  2|  3|  0|
      ## |  e|  4|  5|  1|  1|  1|
      ## |  c|  3|  0|  4|  5|  0|
      ## |  b|  4|  1|  2|  0|  0|
      ## |  d|  6|  1|  2|  3|  4|
      ## +---+---+---+---+---+---+
      

    【讨论】:

      猜你喜欢
      • 2017-06-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多