【问题标题】:Split string column based on delimiter and create columns for each value in Pyspark根据分隔符拆分字符串列并为 Pyspark 中的每个值创建列
【发布时间】:2021-03-13 00:16:08
【问题描述】:

我有 1000 个包含以下格式数据的文件:

a|b|c|clm4=1|clm5=3
a|b|c|clm4=9|clm6=60|clm7=23

我想阅读它并将其转换为如下数据框:

clm1|clm2|clm3|clm4|clm5|clm6|clm7
a|b|c|1|3|null|null
a|b|c|9|null|60|23

我尝试了以下方法:

files = [f for f in glob.glob(pathToFile + "/**/*.txt.gz", recursive=True)]
df = spark.read.load(files, format='csv', sep = '|', header=None)

但它给了我以下结果:

clm1, clm2, clm3, clm4, clm5
a, b, c, 1, 3
a, b, c, 9, null

【问题讨论】:

标签: python apache-spark pyspark apache-spark-sql pyspark-dataframes


【解决方案1】:

对于 Spark 2.4+,您可以将文件作为单列读取,然后将其拆分为 |。您将获得一个数组列,您可以使用 higher-order functions 对其进行转换:

df.show(truncate=False)

+----------------------------+
|clm                         |
+----------------------------+
|a|b|c|clm4=1|clm5=3         |
|a|b|c|clm4=9|clm6=60|clm7=23|
+----------------------------+

我们使用transform 函数将我们从clm 列拆分得到的字符串数组转换为结构数组。 每个结构都包含列名(如果存在)(检查字符串是否包含=)或将其命名为clm + (i+1),其中i 是它的位置。

transform_expr = """
transform(split(clm, '[|]'), (x, i) -> 
                   struct(
                         IF(x like '%=%', substring_index(x, '=', 1), concat('clm', i+1)), 
                         substring_index(x, '=', -1)
                         )
        )
"""

现在使用map_from_entries 将数组转换为映射。最后,分解地图并旋转以获取您的列

df.select("clm", 
          explode(map_from_entries(expr(transform_expr))).alias("col_name", "col_value")
         ) \
  .groupby("clm").pivot('col_name').agg(first('col_value')) \
  .drop("clm") \
  .show(truncate=False)

给予:

+----+----+----+----+----+----+----+
|clm1|clm2|clm3|clm4|clm5|clm6|clm7|
+----+----+----+----+----+----+----+
|a   |b   |c   |9   |null|60  |23  |
|a   |b   |c   |1   |3   |null|null|
+----+----+----+----+----+----+----+

【讨论】:

  • 感谢您的帮助。有什么方法可以在上面的代码中设置条件以仅选择现有列名列表中存在的那些列?
猜你喜欢
  • 2021-11-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-09-19
  • 2013-10-19
  • 1970-01-01
  • 2019-05-17
  • 2022-01-04
相关资源
最近更新 更多