【发布时间】:2017-07-19 07:24:53
【问题描述】:
我试图重现 [Databricks][1] 中的示例并将其应用于 Kafka 的新连接器并触发结构化流,但是我无法使用 Spark 中的开箱即用方法正确解析 JSON。 ..
注意:主题以 JSON 格式写入 Kafka。
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", IP + ":9092")
.option("zookeeper.connect", IP + ":2181")
.option("subscribe", TOPIC)
.option("startingOffsets", "earliest")
.option("max.poll.records", 10)
.option("failOnDataLoss", false)
.load()
下面的代码行不通,相信是因为json列是字符串,和from_json方法签名不匹配...
val df = ds1.select($"value" cast "string" as "json")
.select(from_json("json") as "data")
.select("data.*")
有什么建议吗?
【问题讨论】:
标签: scala apache-spark apache-kafka apache-kafka-connect spark-structured-streaming