【问题标题】:pyspark dataframe change column with two arrays into columnspyspark数据框将具有两个数组的列更改为列
【发布时间】:2019-04-07 13:52:46
【问题描述】:

我一直在四处寻找,但还没有找到一种方法来重构数据框的列,以便根据数组内容动态地向数据框添加新列。我是 python 新手,所以我可能在搜索错误的术语,这也是我还没有找到明确示例的原因。请让我知道这是否是重复的和找到它的参考链接。我想我只需要指出正确的方向。

好的,详细的。

环境是pyspark 2.3.2和python 2.7

示例列包含 2 个数组,它们彼此 1 对 1 相关。我想为 titles 数组中的每个值创建一个列并放入相应的名称(在person 数组)各自的列。

我拼凑了一个例子来专注于我更改数据框的问题。

import json
from pyspark.sql.types import ArrayType, StructType, StructField, StringType
from pyspark.sql import functions as f

input = { "sample": {    "titles": ["Engineer", "Designer", "Manager"],    "person": ["Mary", "Charlie", "Mac"]  },  "location": "loc a"},{ "sample": {    "titles": ["Engineer", "Owner"],
    "person": ["Tom", "Sue"]  },  "location": "loc b"},{ "sample": {    "titles": ["Engineer", "Designer"],    "person": ["Jane", "Bill"]  },  "location": "loc a"}

a = [json.dumps(input)]
jsonRDD = sc.parallelize(a)
df = spark.read.json(jsonRDD)

这是我的数据框的架构:

In [4]: df.printSchema()
root
 |-- location: string (nullable = true)
 |-- sample: struct (nullable = true)
 |    |-- person: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- titles: array (nullable = true)
 |    |    |-- element: string (containsNull = true)

我的数据框数据:

In [5]: df.show(truncate=False)
+--------+-----------------------------------------------------+
|location|sample                                               |
+--------+-----------------------------------------------------+
|loc a   |[[Mary, Charlie, Mac], [Engineer, Designer, Manager]]|
|loc b   |[[Sue, Tom], [Owner, Engineer]]                      |
|loc a   |[[Jane, Bill], [Engineer, Designer]]                 |
+--------+-----------------------------------------------------+

我希望我的数据框看起来像什么:

+--------+-----------------------------------------------------+------------+-----------+---------+---------+
|location|sample                                               |Engineer    |Desginer   |Manager  | Owner   |
+--------+-----------------------------------------------------+------------+-----------+---------+---------+
|loc a   |[[Mary, Charlie, Mac], [Engineer, Designer, Manager]]|Mary        |Charlie    |Mac      |         |
|loc b   |[[Sue, Tom], [Owner, Engineer]]                      |Tom         |           |         |Sue      |
|loc a   |[[Jane, Bill], [Engineer, Designer]]                 |Jane        |Bill       |         |         |
+--------+-----------------------------------------------------+------------+-----------+---------+---------+

我尝试过使用explode 函数,结果却是在每条记录中都有更多的带有数组字段的记录。 stackoverflow 中有一些示例,但它们具有静态列名。该数据集可以按任何顺序排列它们,并且以后可以添加新标题。

【问题讨论】:

    标签: python-2.7 apache-spark pyspark


    【解决方案1】:
    1. 没有explode

    2. explode

      • 使用monotonically_increasing_id添加唯一ID。
      • 使用Pyspark: Split multiple array columns into rows 中显示的方法之一将两个数组分解在一起,或者使用explode 使用first 方法创建的map
      • pivot 结果,按添加的 id 和您要保留的其他字段分组,按 title 旋转并采用 first(person)

    【讨论】:

    • 感谢您的快速回复。我喜欢第一种方法(没有爆炸),它看起来更干净,更直接。创建的字典是向后的,键应该是标题。我将 udf zip 语句更改为“return dict(zip(x[1],x[0])) if x else None”这在 python 中是正确的还是应该使用不同的方法?
    【解决方案2】:

    @user10601094 帮助我回答了这个问题。我在下面发布了完整的解决方案,以帮助其他可能有类似问题的人

    我对python不是很流利,所以请随时提出更好的方法

    In [1]: import json
       ...: from pyspark.sql import functions as f
       ...: 
    
    In [2]: # define a sample data set
       ...: input = { "sample": {    "titles": ["Engineer", "Designer", "Manager"],    "person": ["Mary", "Charlie", "Mac"]  },  "location": "loc a"},{ "sample": {    "titles": ["Engineer", "Owner"],
       ...:     "person": ["Tom", "Sue"]  },  "location": "loc b"},{ "sample": {    "titles": ["Engineer", "Designer"],    "person": ["Jane", "Bill"]  },  "location": "loc a"}
    
    In [3]: # create a dataframe with the sample json data
       ...: a = [json.dumps(input)]
       ...: jsonRDD = sc.parallelize(a)
       ...: df = spark.read.json(jsonRDD)
       ...: 
    2018-11-03 20:48:09 WARN  ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
    
    In [4]: # Change the array in the sample column to a dictionary
       ...: # swap the columns so the titles are the key
       ...: 
       ...: # UDF to convert 2 arrays into a map
       ...: @f.udf("map<string,string>")
       ...: def as_dict(x):
       ...:     return dict(zip(x[1],x[0])) if x else None
       ...: 
    
    In [5]: # create a new dataframe based on the original dataframe
       ...: dfmap = df.withColumn("sample", as_dict("sample"))
    
    In [6]: # Convert sample column to be title columns based on the map
       ...: 
       ...: # get the columns names, stored in the keys
       ...: keys = (dfmap
       ...:     .select(f.explode("sample"))
       ...:     .select("key")
       ...:     .distinct()
       ...:     .rdd.flatMap(lambda x: x)
       ...:     .collect())
    
    In [7]: # create a list of column names 
       ...: exprs = [f.col("sample").getItem(k).alias(k) for k in keys]
       ...: 
    
    In [8]: dfmap.select(dfmap.location, *exprs).show()
    +--------+--------+--------+-------+-----+
    |location|Designer|Engineer|Manager|Owner|
    +--------+--------+--------+-------+-----+
    |   loc a| Charlie|    Mary|    Mac| null|
    |   loc b|    null|     Tom|   null|  Sue|
    |   loc a|    Bill|    Jane|   null| null|
    +--------+--------+--------+-------+-----+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-04-06
      • 1970-01-01
      • 2018-11-08
      • 1970-01-01
      • 2018-01-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多