【问题标题】:How to Extract Columns From BinaryType Using pySpark Databricks?如何使用 pySpark Databricks 从 BinaryType 中提取列?
【发布时间】:2019-12-02 20:04:43
【问题描述】:

问题:从数据框的二进制类型列中提取列。数据框是从 azure 的 blob 存储帐户加载的。

环境:

  • Databricks 5.4(包括 Apache Spark 2.4.3)
  • Python 3.5.2

流程:

  1. 从 avro 文件中获取数据
  2. 提取有用信息并将更用户友好的版本写回镶木地板

Avro 架构:


    SequenceNumber:long
    Offset:string
    EnqueuedTimeUtc:string
    SystemProperties:map
        key:string
        value:struct
            member0:long
            member1:double
            member2:string
            member3:binary
    Properties:map
        key:string
        value:struct
            member0:long
            member1:double
            member2:string
            member3:binary
    Body:binary

我很难从Body:binary 获取数据。我设法使用下面的代码 sn-p 将列转换为字符串

df = df.withColumn("Body", col("Body").cast("string"))

我设法使用下面的代码提取了正文列中的列列表:

        #body string looks like json
        dfBody = df.select(df.Body)
        jsonList = (dfBody.collect())
        jsonString = jsonList[0][0]
        columns = []
        data = json.loads(jsonString)

        for key, value in data.items():
            columns.append(key)

        columns.sort()
        print(columns) 

该列表有一些有趣的列,例如 ID、状态、名称。

问题: 如何添加位于 body 二进制列中的 ID 列并添加到我当前的数据框中。一般来说,我想展平二进制列。二进制列也可能有数组。

【问题讨论】:

    标签: python pyspark avro azure-databricks


    【解决方案1】:

    您不想收集数据框。相反,您应该能够投射和展平身体场。从外观上看,您正在使用来自事件中心的 avro 捕获。这是我用来处理此问题的代码:

    from pyspark.sql.types import StringType, IntegerType, StructType, StructField
    from pyspark.sql.functions import from_json, col
    
    # Create a schema that describes the Body field
    sourceSchema = StructType([
            StructField("Attribute1", StringType(), False),
            StructField("Attribute2", StringType(), True),
            StructField("Attribute3", StringType(), True),
            StructField("Attribute4", IntegerType(), True)])
    
    
    # Convert Body to String and then Json applying the schema
    df = df.withColumn("Body", col("Body").cast("string"))
    jsonOptions = {"dateFormat" : "yyyy-MM-dd HH:mm:ss.SSS"}
    df = df.withColumn("Body", from_json(df.Body, sourceSchema, jsonOptions))
    
    # Flatten Body
    for c in df.schema['Body'].dataType:
        df = df.withColumn(c.name, col("Body." + c.name))
    

    我认为您需要的关键位是 from_json 函数。

    【讨论】:

      猜你喜欢
      • 2020-07-11
      • 1970-01-01
      • 2021-09-16
      • 2021-12-23
      • 1970-01-01
      • 1970-01-01
      • 2019-10-16
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多