【问题标题】:How to extract a JSON string from a column in Spark如何从 Spark 中的列中提取 JSON 字符串
【发布时间】:2021-07-27 17:12:54
【问题描述】:

假设我有一个这样的数据框,其中json_columnStringType()

json_column
{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}
{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}

我想将此 json 的所有字段提取到单独的列中,如下所示:

line_1 house_number city name
Test street 123 New York Test1
Test street 456 Los Angeles Test2

【问题讨论】:

    标签: dataframe apache-spark pyspark apache-spark-sql aws-glue


    【解决方案1】:

    我认为有一种更简单的方法:

    import pyspark.sql.functions as f
    from pyspark import Row
    from pyspark.shell import spark
    from pyspark.sql import DataFrame
    
    df: DataFrame = spark.createDataFrame([
        Row(json_column='{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}'),
        Row(json_column='{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}')
    ])
    
    schema = 'STRUCT<`address`: STRUCT<`city`: STRING, `houseNumber`: BIGINT, `line1`: STRING>, `name`: STRING>'
    df = df.withColumn('obj', f.from_json('json_column', schema))
    
    df = df.select(f.col('obj.address.line1').alias('line_1'),
                   f.col('obj.address.houseNumber').alias('house_number'),
                   f.col('obj.address.city').alias('city'),
                   f.col('obj.name').alias('name'))
    df.show(truncate=False)
    

    输出:

    +-----------+------------+-----------+-----+
    |line_1     |house_number|city       |name |
    +-----------+------------+-----------+-----+
    |Test street|123         |New York   |Test1|
    |Test street|456         |Los Angeles|Test2|
    +-----------+------------+-----------+-----+
    

    UPDATE(通用函数)

    import pyspark.sql.functions as f
    from pyspark import Row
    from pyspark.shell import spark
    from pyspark.sql import DataFrame
    
    
    def get_schema(dataframe: DataFrame, column: str):
        row = dataframe.where(f.col(column).isNotNull()).select(column).first()
        return f.schema_of_json(f.lit(row.asDict()[column]))
    
    
    def flatten(dataframe, column):
        # Adapted from https://stackoverflow.com/a/49532496/6080276 answer
        while True:
            nested_cols = [col for col, _type in dataframe.dtypes
                           if col.startswith(column) and _type.startswith('struct')]
            if len(nested_cols) == 0:
                break
    
            flat_cols = [col for col in dataframe.columns if col not in nested_cols]
            dataframe = dataframe.select(flat_cols +
                                         [f.col(nc + '.' + c).alias(nc + '_' + c)
                                          for nc in nested_cols
                                          for c in dataframe.select(nc + '.*').columns])
        return dataframe
    
    
    def extract_json(dataframe, column_name):
        schema = get_schema(dataframe, column_name)
        dataframe = dataframe.withColumn(column_name, f.from_json(column_name, schema).alias(column_name))
        return flatten(dataframe, column_name)
    
    
    df: DataFrame = spark.createDataFrame([
        Row(json_column='{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}',
            another_json='{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}'),
        Row(json_column='{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}',
            another_json='{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}')
    ])
    
    df.show(truncate=False)
    
    df = extract_json(dataframe=df, column_name='json_column')
    df.show(truncate=False)
    
    df = extract_json(dataframe=df, column_name='another_json')
    df.show(truncate=False)
    

    第一个输出(数据帧):

    +-----------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------+
    |json_column                                                                                    |another_json                                                                                   |
    +-----------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------+
    |{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}   |{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}   |
    |{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}|{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}|
    +-----------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------+
    

    第二次输出(json_column提取):

    +-----------------------------------------------------------------------------------------------+----------------+------------------------+-------------------------------+-------------------------+
    |another_json                                                                                   |json_column_name|json_column_address_city|json_column_address_houseNumber|json_column_address_line1|
    +-----------------------------------------------------------------------------------------------+----------------+------------------------+-------------------------------+-------------------------+
    |{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}   |Test1           |New York                |123                            |Test street              |
    |{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}|Test2           |Los Angeles             |456                            |Test street              |
    +-----------------------------------------------------------------------------------------------+----------------+------------------------+-------------------------------+-------------------------+
    

    第三个输出(another_json提取):

    +----------------+------------------------+-------------------------------+-------------------------+-----------------+-------------------------+--------------------------------+--------------------------+
    |json_column_name|json_column_address_city|json_column_address_houseNumber|json_column_address_line1|another_json_name|another_json_address_city|another_json_address_houseNumber|another_json_address_line1|
    +----------------+------------------------+-------------------------------+-------------------------+-----------------+-------------------------+--------------------------------+--------------------------+
    |Test1           |New York                |123                            |Test street              |Test1            |New York                 |123                             |Test street               |
    |Test2           |Los Angeles             |456                            |Test street              |Test2            |Los Angeles              |456                             |Test street               |
    +----------------+------------------------+-------------------------------+-------------------------+-----------------+-------------------------+--------------------------------+--------------------------+
    

    【讨论】:

    • 你会怎么做这个通用的?
    • 我更新了答案。这就是我在工作中实施的方式。如果您想添加更多功能,例如自动重命名列,请随意复制和更改
    【解决方案2】:

    我想出了这门课:

    """This module provides methods and classes for extracting jsons out
    of data frames and adding them as columns"""
    from typing import List, Tuple, Union
    
    from pyspark.sql.dataframe import DataFrame
    from pyspark.sql.functions import *
    from pyspark.sql.types import StructField, StructType, StringType, DataType
    
    
    # pylint: disable=too-few-public-methods
    class CfJsonStructField:
        """
         This class contains information about a field
         in a json. It can be used to rename the column names while extraction.
         It acts as a wrapper around the spark.sql class StructField
    
        :param CfJsonStructType nested_json_struct_fields: left None if the Field has a simple data type
        If the Field is a nested JSON then a List of CfJsonStructField objects is passed.
        """
    
        # pylint: disable=too-many-arguments
        def __init__(
            self,
            old_column_name: str,
            new_column_name: str = None,
            column_type=StringType(),
            nested_json_struct_fields=None,
        ):
            self.old_column_name = old_column_name
            self.new_column_name = (
                old_column_name if new_column_name is None else new_column_name
            )
            self.column_type = column_type
            self.nested_json_struct_fields = nested_json_struct_fields
    
        def __repr__(self):
            """returns a string representation of the CfJsonStructField which is at the same time code
            for the Union[Tuple, str] representaiton."""
            new_col_str = ""
            col_type_str = ""
            nested_fields_str = ""
            if self.old_column_name != self.new_column_name:
                new_col_str = f", '{self.new_column_name}'"
            if not isinstance(self.column_type, StringType):
                col_type_str = f", {self.column_type.__str__()}()"
            if self.nested_json_struct_fields:
                nested_fields_str = f", {self.nested_json_struct_fields}"
            return f"('{self.old_column_name}'{new_col_str}{col_type_str}{nested_fields_str})"
    
        def construct_spark_struct_field(self):
            """This method creates a spark.sql StructField from
            the class variables"""
            return StructField(
                # Per default we make every column nullable. May be subject to future change
                name=self.old_column_name, dataType=self.column_type, nullable=True
            )
    
    
    def construct_spark_struct_type(schema: List[CfJsonStructField]) -> StructType:
        """This method creates a spark.sql StructType
        from a list of CfJsonStructFields"""
        struct_fields = list(
            map(lambda jsonfield: jsonfield.construct_spark_struct_field(), schema)
        )
        return StructType(struct_fields)
    
    
    def extract_json_from_column(
        data_frame: DataFrame, column: str, schema: List[Union[Tuple, str]]
    ) -> DataFrame:
        """This method extracts a json from a column and adds all the fields
        as new columns back to the dataframe"""
    
        def _col_desc_to_cf_json_struct_field(col_desc: Union[Tuple, str]) -> CfJsonStructField:
            if isinstance(col_desc, str):
                return CfJsonStructField(col_desc)
            old_column_name = col_desc[0]
            if isinstance(col_desc[1], str):
                new_column_name = col_desc[1]
                if len(col_desc) == 3:
                    if isinstance(col_desc[2], DataType):
                        column_type = col_desc[2]
                        return CfJsonStructField(old_column_name, new_column_name, column_type)
                    return CfJsonStructField(
                        old_column_name,
                        new_column_name,
                        nested_json_struct_fields=[
                            _col_desc_to_cf_json_struct_field(t) for t in col_desc[2]])
                return CfJsonStructField(old_column_name, new_column_name)
            return CfJsonStructField(
                old_column_name,
                nested_json_struct_fields=[_col_desc_to_cf_json_struct_field(t) for t in col_desc[1]])
    
        def _extract(
            data_frame: DataFrame, column: str, schema: List[CfJsonStructField]
        ) -> DataFrame:
            """This method extracts a json from a column and adds all the fields
            as new columns back to the dataframe"""
            data_frame = data_frame.withColumn(
                "data", from_json(column, construct_spark_struct_type(schema))
            ).select("*", col("data.*"))
    
            for field in schema:
                data_frame = data_frame.withColumnRenamed(
                    field.old_column_name, field.new_column_name
                )
    
            data_frame = data_frame.drop(column, "data")
            for field in schema:
                if field.nested_json_struct_fields is not None:
                    data_frame = _extract(
                        data_frame,
                        column=field.new_column_name,
                        schema=field.nested_json_struct_fields,
                    )
            return data_frame
    
        return _extract(
            data_frame,
            column,
            [_col_desc_to_cf_json_struct_field(t) for t in schema]
        )
    

    您现在可以像这样拨打extract_json_from_column

    has_json_extracted = extract_json_from_column(
        data_frame=data_frame,
        column='json_column',
        schema=[
            'name',
            ('address', [
                    ('line1', 'line_1'),
                    ('houseNumber', 'house_number', IntegerType()),
                    'city',
            ])
        ]
    )
    

    元组是这样构建的:

    1. 'field_name'
    2. 应该如何调用新列(如果列名 = 字段名,则可以省略),或者如果是嵌套字段,则为数组
    3. Spark 数据类型,默认为 StringType() )

    【讨论】:

      【解决方案3】:

      转换为 RDD 然后再读取将是最简单的方法。

      tl;博士;

      from pyspark.sql.functions import col
      df=spark.createDataFrame([
          Row(json_column='{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}'),
          Row(json_column='{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}')
      ])
      new_rdd = df.rdd.flatMap(lambda x:x)
      new_df = spark.read.json(new_rdd)
      new_df = new_df.select(col("address.*"),col("name"))
      

      说明

      >>> from pyspark.sql.functions import col
      >>> df=spark.createDataFrame([
          Row(json_column='{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}'),
          Row(json_column='{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}')
      ])
      >>> df.show(truncate=False)
      +-----------------------------------------------------------------------------------------------+
      |json_column                                                                                    |
      +-----------------------------------------------------------------------------------------------+
      |{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}   |
      |{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}|
      +-----------------------------------------------------------------------------------------------+
      
      >>> df.printSchema()
      root
       |-- json_column: string (nullable = true)
      
      >>> new_rdd = df.rdd.flatMap(lambda x:x)
      >>> new_rdd.take(2)
      ['{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}', '{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}']
      
      >>> new_df = spark.read.json(new_rdd)
      
      >>> new_df.show(truncate=False)
      +-------------------------------+-----+
      |address                        |name |
      +-------------------------------+-----+
      |[New York, 123, Test street]   |Test1|
      |[Los Angeles, 456, Test street]|Test2|
      +-------------------------------+-----+
      
      >>> new_df.printSchema()
      root
       |-- address: struct (nullable = true)
       |    |-- city: string (nullable = true)
       |    |-- houseNumber: long (nullable = true)
       |    |-- line1: string (nullable = true)
       |-- name: string (nullable = true)
      
      >>> new_df = new_df.select(col("address.*"),col("name"))
      >>> new_df.show()
      +-----------+-----------+-----------+-----+
      |       city|houseNumber|      line1| name|
      +-----------+-----------+-----------+-----+
      |   New York|        123|Test street|Test1|
      |Los Angeles|        456|Test street|Test2|
      +-----------+-----------+-----------+-----+
      
      

      【讨论】:

        【解决方案4】:

        此类行为有一个专用函数:json_tuple()。在这种情况下,它有点复杂,因为有一个嵌套的 JSON,但它仍然可以使用。

        import pyspark.sql.functions as F
        from pyspark import Row
        from pyspark.shell import spark
        
        df = spark.createDataFrame([
            Row(json_column='{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}'),
            Row(json_column='{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}')
        ])
        
        df = (
          df
          .select(F.json_tuple(F.col("json_column"), "address", "name"))
          .select(F.json_tuple(F.col("c0"), "line1", "houseNumber", "city"), "c1")
        )
        df = df.toDF("line_1", "house_number", "city", "name")
        df.show(2, False)
        

        结果:

        +-----------+------------+-----------+-----+
        |line_1     |house_number|city       |name |
        +-----------+------------+-----------+-----+
        |Test street|123         |New York   |Test1|
        |Test street|456         |Los Angeles|Test2|
        +-----------+------------+-----------+-----+
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2022-09-28
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2013-10-13
          • 2022-11-11
          • 2019-02-09
          相关资源
          最近更新 更多