【发布时间】:2018-09-21 13:11:45
【问题描述】:
我正在尝试从 Kinesis 读取流式 JSON 数据到 PySpark。我的 JSON 看起来像:
{'installmentNo': '10', 'loanId': '1'}
我已经指定了架构,但是当 spark 读取数据时我得到“null”。下面是代码sn-p。
from pyspark.sql.types import *
from pyspark.sql.functions import from_json
fields = [
StructField("installmentNo", IntegerType(), True),
StructField("loanId", IntegerType(), True)
]
pythonSchema = StructType(fields)
kinesisDf = spark.readStream \
.format("kinesis")\
.option("streamName", kinesisStreamName)\
.option("region", kinesisRegion)\
.option("initialPosition", "latest")\
.option("awsAccessKey", awsAccessKeyId)\
.option("awsSecretKey", awsSecretKey).load()
dataDevicesDF = kinesisDf.selectExpr("cast (data as STRING) my_json_data").select(from_json("my_json_data", pythonSchema).alias("yp_inst")).select("yp_inst.*")
display(dataDevicesDF)
输出:
但是,当我删除“from_json”部分时,我得到一个带有 JSON 字符串的列。但我想将 json 分解为特定的列并将数据作为 df 获取。有人可以建议我进行更改吗?
【问题讨论】:
标签: apache-spark pyspark amazon-kinesis