【发布时间】:2020-09-23 19:22:37
【问题描述】:
我需要使用 Kafka 主题,它为每一行生成动态 Json 字符串。我无法解析没有模式的 Json 字符串。就我而言,Schema 可以是动态的。 spark.read.json 可以推断 json 架构。但它需要“DATASET”或“JSON 文件”。
有什么方法可以将 Kafka 主题(值)转换为 DATASET?这样我就可以使用 spark.read.json,它接受 DATASET 作为输入,它可以解析 json 的模式。
但是如果我使用下面的代码。
val klines = spark.
readStream.
format("kafka").
option("kafka.bootstrap.servers", "host1:port1,host2:port2").
option("subscribe", "topic").
load().
select($"value".cast("string").alias("value"))
val query = klines.
select(from_json($"value",schema=spark.read.json(klines.as[String]).schema)).
writeStream.
format("console").
start()
query.awaitTermination()
出现以下错误: 线程 "main" org.apache.spark.sql.AnalysisException 中的异常:必须使用 writeStream.start() 执行带有流源的查询;; 卡夫卡
我正在做一些中间计算,例如展平架构。但如果我这样做,就会发生同样的错误。我如何处理 spark 结构化流(scala)中的基本中间计算?
【问题讨论】:
-
您使用的是火花流式传输还是火花结构化流式传输?如果您不使用架构,那么您将在哪里加载这些数据?
-
@OneCricketeer:Spark 结构化流。它可以是复杂的嵌套模式。将架构展平,并在加载前在运行时找到架构..
-
@JavaTechnical:我无法将 Kafka 输出(select($"value".cast( to ="string").alias(alias = "value")))更改为 Seq。请问有什么解决方法吗?
-
@Raja 您始终可以将消息反序列化为字符串(在 Kafka 消费者中使用 StringDeserializer),然后您将获得的值将是字符串,您最终可以对其进行解析。所以你不需要强制转换为String,只需将
value.deserializer转换为StringDeserializer并解析即可
标签: json apache-spark apache-kafka spark-structured-streaming