【问题标题】:Issue while reading json from kinesis to pyspark将 json 从 kinesis 读取到 pyspark 时出现问题
【发布时间】:2018-09-21 13:11:45
【问题描述】:

我正在尝试从 Kinesis 读取流式 JSON 数据到 PySpark。我的 JSON 看起来像:

{'installmentNo': '10', 'loanId': '1'}

我已经指定了架构,但是当 spark 读取数据时我得到“null”。下面是代码sn-p。

from pyspark.sql.types import *
from pyspark.sql.functions import from_json

fields = [

  StructField("installmentNo", IntegerType(), True),
  StructField("loanId", IntegerType(), True)

]
pythonSchema = StructType(fields)

kinesisDf = spark.readStream \
.format("kinesis")\
.option("streamName", kinesisStreamName)\
.option("region", kinesisRegion)\
.option("initialPosition", "latest")\
.option("awsAccessKey", awsAccessKeyId)\
.option("awsSecretKey", awsSecretKey).load()

dataDevicesDF = kinesisDf.selectExpr("cast (data as STRING) my_json_data").select(from_json("my_json_data", pythonSchema).alias("yp_inst")).select("yp_inst.*")
display(dataDevicesDF)

输出:

但是,当我删除“from_json”部分时,我得到一个带有 JSON 字符串的列。但我想将 json 分解为特定的列并将数据作为 df 获取。有人可以建议我进行更改吗?

【问题讨论】:

    标签: apache-spark pyspark amazon-kinesis


    【解决方案1】:

    架构不正确 - 您的数据是字符串,而您声明的是整数。

    请将定义改为

    pythonSchema = StructType([
        StructField("installmentNo", StringType(), True),
        StructField("loanId", StringType(), True)
    ])
    

    并转换输出:

    from_json(
        "my_json_data", pythonSchema
    ).cast("struct<installmentNo: integer, loanId: integer>"))
    

    其余代码应保持原样,但为清楚起见,您可以显式设置选项(因为输入不是标准 JSON):

    from_json(
        "my_json_data", pythonSchema, {"allowSingleQuotes": "true"}
    ).cast("struct<installmentNo: integer, loanId: integer>"))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-11-06
      • 2014-03-01
      • 1970-01-01
      • 2015-08-18
      • 2015-07-17
      相关资源
      最近更新 更多