【问题标题】:Spark Streaming : Reading data from kafka that has multiple schemaSpark Streaming:从具有多个模式的kafka读取数据
【发布时间】:2018-04-04 21:15:37
【问题描述】:

我正在为 spark 流的实现而苦苦挣扎。

来自 kafka 的消息看起来像这样,但字段更多

{"event":"sensordata", "source":"sensors", "payload": {"actual data as a json}}
{"event":"databasedata", "mysql":"sensors", "payload": {"actual data as a json}}
{"event":"eventApi", "source":"event1", "payload": {"actual data as a json}}
{"event":"eventapi", "source":"event2", "payload": {"actual data as a json}}

我正在尝试从 Kafka 主题(具有多个模式)中读取消息。我需要阅读每条消息并查找事件和源字段并决定将其存储为数据集的位置。实际数据以 JSON 形式在字段有效负载中,它只是一条记录。

有人可以帮我实现这个或任何其他替代方案吗?

在同一主题中发送具有多个模式的消息并消费它是一种好方法吗?

提前致谢,

【问题讨论】:

  • 生产者是否可以一次发送每条消息,您可以尝试一下。然后您可以将此字符串解析为jsonString并获取键和值。
  • 您使用的是 avro 架构吗?如果是这样,avro 的模式演变将解决您的问题。如果没有,请尝试使用 avro 架构。
  • @Nilesh 是的,我可以在解析后获取键值,但是如何从解析后的数据创建数据集并将其存储在某处,它只包含该消息中的一条记录。

标签: apache-spark apache-kafka spark-streaming apache-spark-dataset


【解决方案1】:

您可以从传入的 JSON 对象创建 Dataframe

创建 Seq[Sring] 的 JSON 对象。

使用 val df=spark.read.json[Seq[String]]

在您选择的dataframe df 上执行操作。

【讨论】:

    【解决方案2】:

    如果您只关心某些列,请将 JsonString 转换为 JavaBean

    【讨论】:

    • 我可以得到有效载荷字段,这是我需要的确切数据,但是我如何根据它们的案例类分离这些消息?
    猜你喜欢
    • 2017-12-20
    • 2017-01-26
    • 2014-10-19
    • 1970-01-01
    • 2019-06-24
    • 2023-03-08
    • 2018-01-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多