【问题标题】:Spark merge rows in one rowSpark 在一行中合并行
【发布时间】:2020-06-23 21:40:51
【问题描述】:

我有以下数据框:

#+-----------------------------+--------+--------+---------+
#|PROVINCE_STATE|COUNTRY_REGION|2/1/2020|2/1/2020|2/11/2020|
#+--------------+--------------+--------+--------+---------+
#|             -|     Australia|      12|      15|       15|
#+--------------+--------------+--------+--------+---------+

我需要将所有行合并为一个,并且日期的总和基于 COUNTRY_REGION。 问题是我有更多的列并且不知道如何动态地做到这一点。试过groupBy,但还是不行。谢谢。

【问题讨论】:

    标签: sql scala apache-spark pyspark apache-spark-sql


    【解决方案1】:

    如果您的前两列始终是省和州,而其他 n-列是日期,您可以在下面尝试 (Scala):

    import org.apache.spark.sql.functions._
    val dateCols = df.columns.drop(2).map(c => sum(c).as(c)) // select all columns except first 2 and perform sum on each of them
    df.groupBy('country_region).agg(dateCols.head,dateCols.tail:_*).show()
    

    python 版本:

    import pyspark.sql.functions as f
    dateCols = [f.sum(c) for c in df.columns][2:] # select all columns except first 2 and perform sum on each of them
    df.groupBy('country_region').agg(*dateCols).show()
    

    输出:

    +--------------+--------+---------+---------+
    |country_region|2/1/2020|2/10/2020|2/11/2020|
    +--------------+--------+---------+---------+
    |           aus|      12|       15|       15|
    +--------------+--------+---------+---------+
    

    【讨论】:

    • 我加了python版本,很像
    • 这比用Window分区好
    • python版本中的f是什么?
    • 从 org.apache.spark.sql.functions 导入
    • 我添加了完全导入
    【解决方案2】:

    使用聚合:

    select '-' as province_state, country_region,
           sum(`2/1/2020`), sum(`2/10/2020`), sum(`2/11/2020`)
    from t
    group by country_region;
    

    我不确定您所说的“动态”是什么意思。作为 SQL 查询,您需要单独列出每个表达式。

    【讨论】:

    • 我的意思是我有很多行日期,我应该手写每一列吗?我可以循环获取所有列吗?
    • #+-----------------------------+---------- ------------------+------------+-----------+------ -----+ #|PROVINCE_STATE |国家地区| 2/1/2020|2/1/2020|2/11/2020| #+--------------+--------------+------------------ ---------+-----------+------------+-------------#| -|澳大利亚|空|空|空 | #+--------------+--------------+--------+--------+ ---------+----------------------------------------
    • @JohnCoffey 。 . .我认为 SparkSQL 实际上对格式不正确的标识符使用反引号而不是双引号。
    【解决方案3】:

    试试这个。

    from pyspark.sql import functions as F
    from dateutil.parser import parse
    
    def is_date(string, fuzzy=False):
        try: 
            parse(string, fuzzy=fuzzy)
            return True
        except ValueError:
            return False
    
    df.groupBy(F.lit('-').alias("PROVINCE_STATE"),'COUNTRY_REGION')\
      .agg(*((F.sum(x)).cast('int').alias(x) for x in df.columns if is_date(x)==True)).show()
    
    
    #+--------------+--------------+--------+---------+---------+
    #|PROVINCE_STATE|COUNTRY_REGION|2/1/2020|2/10/2020|2/11/2020|
    #+--------------+--------------+--------+---------+---------+
    #|             -|     Australia|      12|       15|       15|
    #+--------------+--------------+--------+---------+---------+
    

    【讨论】:

      【解决方案4】:

      在 pyspark 中试试这个:一种方法是使用窗口函数

          from pyspark.sql import SparkSession
          from pyspark.sql import functions as F
          from pyspark.sql.window import Window
      
      
          spark = SparkSession.builder \
              .appName('SO')\
              .getOrCreate()
      
      
          sc= spark.sparkContext
      
          df = sc.parallelize([
              ("new south wales", "aus", 4, 4, 4),("victoria",  "aus", 4, 4, 4), ("queensland",  "aus", 3, 5, 5), ("south australia","aus", 1, 2, 2)
          ]).toDF(["province_state", "country_region", "2/1/2020", "2/10/2020", "2/11/2020"])
      
          df.show()
          #
          # +---------------+--------------+--------+---------+---------+
          # | province_state|country_region|2/1/2020|2/10/2020|2/11/2020|
          # +---------------+--------------+--------+---------+---------+
          # |new south wales|           aus|       4|        4|        4|
          # |       victoria|           aus|       4|        4|        4|
          # |     queensland|           aus|       3|        5|        5|
          # |south australia|           aus|       1|        2|        2|
          # +---------------+--------------+--------+---------+---------+
      
          w = Window().partitionBy('country_region')
      
          w1 = Window().partitionBy('country_region').orderBy('country_region')
      
          for column in df.columns:
              if column not in ['country_region','province_state']:
                  df = df.withColumn(column, F.sum(column).over(w) )
      
          df1 = df.withColumn("r_no", F.row_number().over(w1)).where(F.col('r_no')==1)
      
          df1.select(F.lit('_').alias('province_state'), *[ column for column in df1.columns if column not in ['province_state']]).drop(F.col('r_no')).show()
      
          # +--------------+--------------+--------+---------+---------+
          # |province_state|country_region|2/1/2020|2/10/2020|2/11/2020|
          # +--------------+--------------+--------+---------+---------+
          # |             _|           aus|      12|       15|       15|
          # +--------------+--------------+--------+---------+---------+
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2017-03-10
        • 1970-01-01
        • 1970-01-01
        • 2020-07-17
        • 1970-01-01
        • 1970-01-01
        • 2015-05-24
        • 2020-12-26
        相关资源
        最近更新 更多