【发布时间】:2019-05-16 23:23:48
【问题描述】:
我正在使用 Spark Structured Streaming 在 Scala 中编写一个 Spark 应用程序,该应用程序从 Kafka 接收一些以 JSON 样式格式化的数据。此应用程序可以接收以这种方式格式化的单个或多个 JSON 对象:
[{"key1":"value1","key2":"value2"},{"key1":"value1","key2":"value2"},...,{"key1":"value1","key2":"value2"}]
我试图定义一个 StructType 像:
var schema = StructType(
Array(
StructField("key1",DataTypes.StringType),
StructField("key2",DataTypes.StringType)
))
但它不起作用。 我解析 JSON 的实际代码:
var data = (this.stream).getStreamer().load() .selectExpr("CAST (value AS STRING) as json") .select(from_json($"json",schema=schema).as("data"))
我想在类似的数据框中获取此 JSON 对象
+----------+---------+
| key1| key2|
+----------+---------+
| value1| value2|
| value1| value2|
........
| value1| value2|
+----------+---------+
有人可以帮帮我吗? 谢谢!
【问题讨论】:
-
在转换为 JSON 之前,爆炸你的数组,它应该可以工作。
-
参考此链接,stackoverflow.com/questions/48361177/… 我无法发表评论,因为我还没有到达那里..
-
@Sc0rpion,架构始终相同。结构是问题
-
@vindev 我试过了,还是不行
-
@Vinc 你能分享你的尝试吗?你遇到了什么错误?
标签: json scala apache-spark spark-streaming-kafka