【问题标题】:Map DataFrame columns映射 DataFrame 列
【发布时间】:2021-06-12 21:40:23
【问题描述】:

我对 Python 很陌生,并尝试使用 Azure Databricks 将数据从制表符分隔的文件转换为 json。我有以下输入数据:

ID  Title   NumberA NumberB
1   test1   0       1
2   test2   2       3

我正在尝试将其转换为 json 的形式:

[
    {
        "ID": 1,
        "Title": "title1",
        "Numbers": [
            {
                "Type": "TypeA",
                "Code": "0"
            },
            {
                "Type": "TypeB",
                "Code": "1"
            }
        ]
    },
    {
        "ID": 2,
        "Title": "title2",
        "Numbers": [
            {
                "Type": "TypeA",
                "Code": "2"
            },
            {
                "Type": "TypeB",
                "Code": "3"
            }
        ]
    }
]

我的输入数据有这样的架构:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

class MySchemas:
    def input_struct():
        schema = StructType()
        schema.add('ID', IntegerType(), True)
        schema.add('Title', StringType(), True)
        schema.add('NumberA', IntegerType(), True)
        schema.add('NumberB', IntegerType(), True)
        schema.add('_corrupt_record', StringType(), True)
        return schema

在另一个类中,我正在使用如下模式读取输入数据:

df_schema = MySchemas.input_struct()
inputs = self.spark.read.option('sep', "\t").option("header","true").option('mode', 'PERMISSIVE').schema(df_schema).csv(self.sourceFilePath, quote="", escape="")

我不知道最后一部分是怎么做的。我需要获取NumberANumberB 列并将它们替换为一个名为Numbers 的列,这是一个对象列表,其中包含一个名为Type 的文字字符串列,另一列包含来自NumberA 和@ 的值987654330@.

有人可以告诉我该怎么做吗?我正在使用带有 Spark 2.4.3 的 Databricks Runtime 5.5 LTS。

【问题讨论】:

    标签: python json apache-spark pyspark apache-spark-sql


    【解决方案1】:

    您可以创建结构数组并转换为 json:

    import pyspark.sql.functions as F
    
    result = inputs.select(
        'ID', 'Title', 
        F.array(
            F.struct(
                F.lit('TypeA').alias('Type'), 
                F.col('NumberA').alias('Code')
            ), 
            F.struct(
                F.lit('TypeB').alias('Type'), 
                F.col('NumberB').alias('Code')
            )
        ).alias('Numbers')
    ).agg(
        F.collect_list(
            F.to_json(F.struct('ID', 'Title', 'Numbers'))
        ).alias('col')
    )
    
    result.show(truncate=False)
    +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |col                                                                                                                                                                                 |
    +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |[{"ID":1,"Title":"test1","Numbers":[{"Type":"TypeA","Code":0},{"Type":"TypeB","Code":1}]}, {"ID":2,"Title":"test2","Numbers":[{"Type":"TypeA","Code":2},{"Type":"TypeB","Code":3}]}]|
    +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    

    如果你想输出 JSON 文件,你可以跳过.agg 部分,然后做result.write.json('filepath')

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-05-16
      • 1970-01-01
      • 2016-09-02
      • 2018-07-13
      • 1970-01-01
      • 1970-01-01
      • 2021-08-04
      • 2018-02-20
      相关资源
      最近更新 更多