【问题标题】:How to explode multiple columns, different types and different lengths?如何炸开多列、不同类型、不同长度?
【发布时间】:2019-07-08 08:44:53
【问题描述】:

我有一个包含不同时间周期(1/6、3/6、6/6 等)列的 DF,并希望“分解”所有列以创建一个新的 DF,其中每一行都是一个 1/6 周期。

from pyspark import Row 
from pyspark.sql import SparkSession 
from pyspark.sql.functions import explode, arrays_zip, col

spark = SparkSession.builder \
    .appName('DataFrame') \
    .master('local[*]') \
    .getOrCreate()

df = spark.createDataFrame([Row(a=1, b=[1, 2, 3, 4, 5, 6], c=[11, 22, 33], d=['foo'])])

|  a|                 b|           c|    d|
+---+------------------+------------+-----+
|  1|[1, 2, 3, 4, 5, 6]|[11, 22, 33]|[foo]|
+---+------------------+------------+-----+

我正在做爆炸:

df2 = (df.withColumn("tmp", arrays_zip("b", "c", "d"))
       .withColumn("tmp", explode("tmp"))
       .select("a", col("tmp.b"), col("tmp.c"), "d"))

但输出不是我想要的:

|  a|  b|   c|    d|
+---+---+----+-----+
|  1|  1|  11|[foo]|
|  1|  2|  22|[foo]|
|  1|  3|  33|[foo]|
|  1|  4|null|[foo]|
|  1|  5|null|[foo]|
|  1|  6|null|[foo]|
+---+---+----+-----+

我希望它看起来像这样:

|  a|  b|  c|  d|
+---+---+---+---+
|  1|  1| 11|foo|
|   |  2|   |   |
|   |  3| 22|   |
|   |  4|   |   |
|   |  5| 33|   |
|   |  6|   |   |
+---+---+---+---+

我是 Spark 的新手,从一开始我就有很复杂的话题! :)

2019-07-15 更新:也许有人有不使用 UDF 的解决方案? -> 由@jxc 回答

2019-07-17 更新:也许有人有一个解决方案,如何以更复杂的顺序更改空 值序列?就像在c - Null, 11, Null, 22, Null, 33 或更复杂的情况下,我们希望在列d 第一个值是Null,下一个是foo,然后是Null, Null, Null

|  a|  b|  c|  d|
+---+---+---+---+
|  1|  1|   |   |
|   |  2| 11|foo|
|   |  3|   |   |
|   |  4| 22|   |
|   |  5|   |   |
|   |  6| 33|   |
+---+---+---+---+

【问题讨论】:

    标签: python pyspark


    【解决方案1】:

    这是不使用 udf 的一种方法:

    2019/07/17 更新:调整 SQL stmt 并将 N=6 作为参数添加到 SQL。

    2019/07/16 更新: 删除了临时列 t,在 transform 函数中替换为常量 array(0,1,2,3,4,5)。在这种情况下,我们可以直接对数组元素的值而不是它们的索引进行操作。

    更新:我删除了使用字符串函数并将数组元素中的数据类型全部转换为字符串的原始方法,效率较低。 Spark 2.4+的Spark SQL高阶函数应该比原来的方法好。

    设置

    from pyspark.sql import functions as F, Row
    
    df = spark.createDataFrame([ Row(a=1, b=[1, 2, 3, 4, 5, 6], c=['11', '22', '33'], d=['foo'], e=[111,222]) ])
    
    >>> df.show()
    +---+------------------+------------+-----+----------+
    |  a|                 b|           c|    d|         e|
    +---+------------------+------------+-----+----------+
    |  1|[1, 2, 3, 4, 5, 6]|[11, 22, 33]|[foo]|[111, 222]|
    +---+------------------+------------+-----+----------+
    
    # columns you want to do array-explode
    cols = df.columns
    
    # number of array elements to set
    N = 6
    

    使用 SQL 高阶函数:transform

    使用 Spark SQL 高阶函数:transform(),执行以下操作:

    1. 创建以下 Spark SQL 代码,其中 {0} 将替换为 column_name,{1} 将替换为 N

      stmt = '''
         CASE
            WHEN '{0}' in ('d') THEN
              transform(sequence(0,{1}-1), x -> IF(x == 1, `{0}`[0], NULL))
            WHEN size(`{0}`) <= {1}/2 AND size(`{0}`) > 1 THEN
              transform(sequence(0,{1}-1), x -> IF(((x+1)*size(`{0}`))%{1} == 0, `{0}`[int((x-1)*size(`{0}`)/{1})], NULL))
            ELSE `{0}`
          END AS `{0}`
      '''
      

      注意: 数组转换仅在数组包含多个(除非在单独的WHEN 子句中指定)和&lt;= N/2 元素(在本例中, 1 &lt; size &lt;= 3)。其他大小的数组将保持原样。

    2. 使用 selectExpr() 对所有必需的列运行上述 SQL

      df1 = df.withColumn('a', F.array('a')) \
              .selectExpr(*[ stmt.format(c,N) for c in cols ])
      
      >>> df1.show()
      +---+------------------+----------------+-----------+---------------+
      |  a|                 b|               c|          d|              e|
      +---+------------------+----------------+-----------+---------------+
      |[1]|[1, 2, 3, 4, 5, 6]|[, 11,, 22,, 33]|[, foo,,,,]|[,, 111,,, 222]|
      +---+------------------+----------------+-----------+---------------+
      
    3. 运行 arrays_zipexplode

      df_new = df1.withColumn('vals', F.explode(F.arrays_zip(*cols))) \
                  .select('vals.*') \
                  .fillna('', subset=cols)
      
      >>> df_new.show()
      +----+---+---+---+----+
      |   a|  b|  c|  d|   e|
      +----+---+---+---+----+
      |   1|  1|   |   |null|
      |null|  2| 11|foo|null|
      |null|  3|   |   | 111|
      |null|  4| 22|   |null|
      |null|  5|   |   |null|
      |null|  6| 33|   | 222|
      +----+---+---+---+----+
      

      注意fillna('', subset=cols) 只更改了包含字符串的列

    在一个方法链中:

    df_new = df.withColumn('a', F.array('a')) \
               .selectExpr(*[ stmt.format(c,N) for c in cols ]) \
               .withColumn('vals', F.explode(F.arrays_zip(*cols))) \
               .select('vals.*') \
               .fillna('', subset=cols)
    

    用变换函数解释:

    转换功能(如下所列,反映旧版本的需求)

    transform(sequence(0,5), x -> IF((x*size({0}))%6 == 0, {0}[int(x*size({0})/6)], NULL))
    

    如帖子中所述,{0} 将被替换为列名。这里我们以包含 3 个元素的 column-c 为例:

    • 在变换函数中,sequence(0,5) 创建一个包含 6 个元素的常量数组 array(0,1,2,3,4,5),其余设置 lambda 函数,其中一个参数 x 具有元素的值。
    • IF(condition, true_value, false_value):是标准的SQL函数
    • 我们应用的条件是:(x*size(c))%6 == 0 其中size(c)=3,如果这个条件为真,则返回c[int(x*size(c)/6)] ,否则返回NULL。所以对于 x 从 0 到 5,我们将有:

      ((0*3)%6)==0) true   -->  c[int(0*3/6)] = c[0]
      ((1*3)%6)==0) false  -->  NULL
      ((2*3)%6)==0) true   -->  c[int(2*3/6)] = c[1]
      ((3*3)%6)==0) false  -->  NULL
      ((4*3)%6)==0) true   -->  c[int(4*3/6)] = c[2]
      ((5*3)%6)==0) false  -->  NULL
      

    类似于 column-e,它包含一个 2 元素数组。

    【讨论】:

    • 很好的解释,在“t”列上有点棘手,但是哇,这样做的好方法!
    • 您能否更深入地解释一下transform(t, (x,i) -&gt; IF((i*size({0}))%6 == 0, {0}[int(i*size({0})/6)], NULL))?谢谢!
    • @cincin21,添加了transform函数的解释。
    • 干得好,一定会帮助我完成学习 pyspark 的任务!最好的!
    • @cincin21,只是好奇,该解决方案对您的任务有效还是有什么问题?
    【解决方案2】:

    要获得输出,您必须将 col a 更改为数组并将空值插入 c 数组。

    from pyspark.sql.types import ArrayType, IntegerType
    from pyspark.sql.functions import explode, arrays_zip, col, array
    
    def fillArrayVals(a):
      for i in [1,3,5]:
        a.insert(i,None)
      return a
    
    fillArrayValsUdf = udf(fillArrayVals, ArrayType(IntegerType(), True))    
    
    df = spark.createDataFrame([Row(a=1, b=[1, 2, 3, 4, 5, 6], c=[11, 22, 33], d=['foo'])])
    df = df.withColumn("a", array(col("a"))).withColumn("c", updateArrayUdf("c"))
    df = df.withColumn("tmp", arrays_zip("a","b", "c", "d"))\
       .withColumn("tmp", explode("tmp"))\
       .select(col("tmp.a"), col("tmp.b"), col("tmp.c"), col("tmp.d"))
    

    上面的代码导致,您可以转换为字符串以显示空值而不是 null

    +----+---+----+----+
    |   a|  b|   c|   d|
    +----+---+----+----+
    |   1|  1|  11| foo|
    |null|  2|null|null|
    |null|  3|  22|null|
    |null|  4|null|null|
    |null|  5|  33|null|
    |null|  6|null|null|
    +----+---+----+----+
    

    【讨论】:

    • 好主意,但也许有一个没有 udf 的解决方案,因为在处理数百万行时建议避免使用这些?
    • 嗨@fathomson,我正在尝试执行代码,但首先我还必须添加from pyspark.sql.functions import udf,但仍然不确定如何导入updateArray
    • 嗨@cincin21,在提供的代码中出现错误。 updateArray 用包含函数的 fillArrayVals 重命名
    猜你喜欢
    • 2019-02-23
    • 2017-03-06
    • 2022-01-23
    • 1970-01-01
    • 1970-01-01
    • 2022-01-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多