【问题标题】:How to create JSON structure from a pyspark dataframe?如何从 pyspark 数据框创建 JSON 结构?
【发布时间】:2021-10-13 18:18:17
【问题描述】:

我正在尝试从 pyspark 数据框创建 JSON 结构。我的数据框中有以下列 - batch_id、batch_run_id、table_name、column_name、column_datatype、last_refresh_time、refresh_frequency、owner

我想要它在下面的 JSON 结构中 -

{
"GeneralInfo": {
    "DataSetID": "xxxx1234Abcsd", 
    "Owner" : ["test1@email.com", "test2@email.com", "test3@email.com"]
    "Description": "", 
    "BuisnessFunction": "", 
    "Source": "", 
    "RefreshRate": "Weekly",
    "LastUpdate": "2020/10/15", 
    "InfoSource": "TemplateInfo"
  },
  "Tables": [
    {
      "TableName": "Employee",
      "Columns" : [
               { "ColumnName" : "EmployeeID",
                  "ColumnDataType": "int"
               },
               { "ColumnName" : "EmployeeName",
                  "ColumnDataType": "string"
               }
            ]
    }
   }
}

我正在尝试通过数据框列索引分配 JSON 字符串中的值,但它给了我一个错误,因为“类型列的对象不是 JSON 可序列化的”。我用过如下-

{
"GeneralInfo": {
    "DataSetID": df["batch_id"], 
    "Owner" : list(df["owner"])
    "Description": "", 
    "BuisnessFunction": "", 
    "Source": "", 
    "RefreshRate": df["refresh_frequency"],
    "LastUpdate": df["last_update_time"], 
    "InfoSource": "TemplateInfo"
  },
  "Tables": [
    {
      "TableName": df["table_name"],
      "Columns" : [
               { "ColumnName" : df["table_name"]["column_name"],
                  "ColumnDataType": df["table_name"]["column_datatype"]
               } 
     
            ]
     }
  }
}

样本数据 -

请帮帮我,我刚开始在 Pyspark 中编码。

【问题讨论】:

  • 能否粘贴示例数据框?
  • @MohanaBC 我已经更新了我的帖子,请检查。谢谢
  • @MohanaBC - 你检查了吗?
  • 是的,但没有像您预期的那样获得单个 JSON 元素。

标签: python json apache-spark pyspark apache-spark-sql


【解决方案1】:

尝试从您提供的示例数据中获取 JSON 格式,输出格式与您预期的不完全匹配。您可以进一步即兴创作以下代码。

我们可以使用 toJSON 函数将数据帧转换为 JSON 格式。在调用 toJSON 函数之前,我们需要使用 array(),通过传递所需的列来匹配 JSON 格式的结构函数。

from pyspark.sql import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').getOrCreate()

in_values = [
    (123, '123abc', 'Employee', 'Employee_id', 'int', '21/05/15', 'Weekly',
     ['test1@gmail.com', 'test1@gmail.com', 'test3@gmail.com']),
    (123, '123abc', 'Employee', 'Employee_name', 'string', '21/05/15', 'Weekly',
     ['test1@gmail.com', 'test1@gmail.com', 'test3@gmail.com'])
]

cols = ["batch_id", "batch_run_id", "table_name", "column_name", "column_datatype",
        "last_update_time", "refresh_frequency", "Owner"]


df = spark.createDataFrame(in_values).toDF(*cols)\
    .selectExpr("*","'' Description", "'' BusinessFunction", "'TemplateInfo' InfoSource", "'' Source")

list1 = [df["batch_id"].alias("DataSetID"), df["Owner"], df["refresh_frequency"].alias("RefreshRate"),
         df["last_update_time"].alias("LastUpdate"), "Description", "BusinessFunction","InfoSource", "Source"]

list2 = [df["table_name"].alias("TableName"),df["column_name"].alias("ColumnName"),
         df["column_datatype"].alias("ColumnDataType")]

df.groupBy("batch_id") \
    .agg(collect_set(struct(*list1))[0].alias("GeneralInfo"),
         collect_list(struct(*list2)).alias("Tables")).drop("batch_id") \
    .toJSON().foreach(print)

# outputs JSON --->
    '''
     {
   "GeneralInfo":{
         "DataSetID":123,
         "Owner":[
            "test1@gmail.com",
            "test1@gmail.com",
            "test3@gmail.com"
         ],
         "RefreshRate":"Weekly",
         "LastUpdate":"21/05/15",
         "Description":"",
         "BusinessFunction":"",
         "InfoSource":"TemplateInfo",
         "Source":""
      },
   "Tables":[
      {
         "TableName":"Employee",
         "ColumnName":"Employee_id",
         "ColumnDataType":"int"
      },
      {
         "TableName":"Employee",
         "ColumnName":"Employee_name",
         "ColumnDataType":"string"
      }
   ]
}
'''
  

【讨论】:

  • 感谢分享。我看到您已经硬编码列名来创建列表。任何动态方式来处理这个而不是硬编码?
  • 另外,{ "GeneralInfo": { "DataSetID": "xxxx1234Abcsd", "Owner" : ["test1@email.com", "test2@email.com", "test3@email.com"] "Description": "", "BuisnessFunction": "", "Source": "", "RefreshRate": "Weekly", "LastUpdate": "2020/10/15", "InfoSource": "TemplateInfo" } 将在 {} 下而不是 []
  • 我们可以使用生成器动态生成列表,我们需要在这里考虑列重命名,因为您的 json 格式有不同的名称。
  • 从 [] 即数组到 {} 即结构,我们可以改变它。让我知道您是否对第二部分“表格”感到满意:[....]?
  • 更新了代码以仅获取结构。是否需要将 table_name 之类的列重命名为 TableName?或者数据的列名是否为 TableName?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-01-21
  • 1970-01-01
  • 2019-04-27
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多