【问题标题】:Json string should be consumed as Kafka topic without schema in spark structured streamingJson 字符串应作为 Kafka 主题使用,在 Spark 结构化流中没有模式
【发布时间】:2020-09-23 19:22:37
【问题描述】:

我需要使用 Kafka 主题,它为每一行生成动态 Json 字符串。我无法解析没有模式的 Json 字符串。就我而言,Schema 可以是动态的。 spark.read.json 可以推断 json 架构。但它需要“DATASET”或“JSON 文件”。

有什么方法可以将 Kafka 主题(值)转换为 DATASET?这样我就可以使用 spark.read.json,它接受 DATASET 作为输入,它可以解析 json 的模式。

但是如果我使用下面的代码。

val klines = spark.
  readStream.
  format("kafka").
  option("kafka.bootstrap.servers", "host1:port1,host2:port2").
  option("subscribe", "topic").
  load().
  select($"value".cast("string").alias("value"))

val query = klines.
  select(from_json($"value",schema=spark.read.json(klines.as[String]).schema)).
  writeStream.
  format("console").
  start()

query.awaitTermination()

出现以下错误: 线程 "main" org.apache.spark.sql.AnalysisException 中的异常:必须使用 writeStream.start() 执行带有流源的查询;; 卡夫卡

我正在做一些中间计算,例如展平架构。但如果我这样做,就会发生同样的错误。我如何处理 spark 结构化流(scala)中的基本中间计算?

【问题讨论】:

  • 您使用的是火花流式传输还是火花结构化流式传输?如果您不使用架构,那么您将在哪里加载这些数据?
  • @OneCricketeer:Spark 结构化流。它可以是复杂的嵌套模式。将架构展平,并在加载前在运行时找到架构..
  • @JavaTechnical:我无法将 Kafka 输出(select($"value".cast( to ="string").alias(alias = "value")))更改为 Seq。请问有什么解决方法吗?
  • @Raja 您始终可以将消息反序列化为字符串(在 Kafka 消费者中使用 StringDeserializer),然后您将获得的值将是字符串,您最终可以对其进行解析。所以你不需要强制转换为String,只需将value.deserializer 转换为StringDeserializer 并解析即可

标签: json apache-spark apache-kafka spark-structured-streaming


【解决方案1】:

JSON 是一个字符串。你可以只是一个字符串类型的模式。

这样我就可以使用 spark.read.json

spark.read.json 来自文件系统

如果您想从 Kafka 读取数据,您可能需要 spark.readStream.format("kafka"),Spark 文档中对此进行了足够详细的描述

Spark 文档中的第一个示例正是这样做的

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

您在对数据进行任何类型的有用分析时都会遇到问题,但鉴于每条记录都有可能不共享相同的字段,因此像 get_json_object 这样的操作将毫无意义

可以说你最好使用原始 Kafka 消费者 API 或 KStreams,它们不需要任何架构,但是你的问题是 不是 架构 - 它是 反序列化为具有可以查询的具体字段的对象类型

【讨论】:

  • 我需要在没有来自 kafka 主题的模式的情况下动态推断 json 字符串。源模式往往会改变。这是我的目标。唯一的方法是使用“spark.read.json”。但我不能。我也在做一些中间计算,比如使模式变胖。以以下错误结束。“线程“主”org.apache.spark.sql.AnalysisException 中的异常:必须使用 writeStream.start();;kafka 执行带有流源的查询”
  • 我觉得你没有看我写的东西。 Spark 无法在 SparkSQL 中使用动态模式。您需要使用 Spark Streaming,而不是 Spark Structured Streaming。 或者你可以使用普通的、普通的 Kafka 消费者
  • 使用 Spark 流来推断动态 json。有用。谢谢
  • 但是在使用火花流时..代码反应异常大容量。我使用了 stream.foreachRDD(rdd => {val jsonDfall = spark.read.json(rdd)...每个 rdd 都有具有不同模式的多条记录...我想在记录级别推断它..
  • 我不确定spark.read.json(rdd) 是如何工作的,因为我认为该方法只能读取文件路径,而不是 RDD
猜你喜欢
  • 2018-06-29
  • 2021-07-02
  • 2019-09-19
  • 1970-01-01
  • 2018-09-21
  • 1970-01-01
  • 1970-01-01
  • 2019-08-01
  • 1970-01-01
相关资源
最近更新 更多