【问题标题】:How to convert values from Kafka data source to a given schema?如何将值从 Kafka 数据源转换为给定的模式?
【发布时间】:2019-12-01 11:05:39
【问题描述】:

我通过以下代码从 kafka 服务器获取日志:

    Dataset<Row> df = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", mykey.Kafka_source)
            .option("subscribe", mykey.Kafka_topic)
            .load();

    Dataset<String> dg = df
            .selectExpr("CAST(value AS STRING)")
            .as(STRING());

然而,dg 的一个元素是这样的“姓名:John Doe,年龄:20”,然而它只有一个键“值”。因此,当我将其保存在 HDFS 中时,它会保存为“值:”名称:John Doe,年龄:22“”。但是,我想像这样更改架构:

root  
|-- name: string (nullable = true)  
|-- age: string (nullable = true)  

这样元素就会像“name : John Doe, age : 22”一样保存

当前元素的schema是这样的:

root  
|-- value: string (nullable = true)

我尝试编写代码将 dg 的每个元素转换为 Dataset 的新元素,但我认为 Java 中的结构化流不支持高级函数表达式。我怎样才能做到这一点..?我想要一些使用 StructType 的解决方案。

【问题讨论】:

    标签: java apache-spark apache-spark-sql spark-structured-streaming


    【解决方案1】:

    您只需将value 转换为预期的架构。

    如果值是 JSON 格式,您将使用 from_json 标准函数之一:

    from_json(e: Column, schema: Column): Column
    

    对于其他格式,您必须应用转换(带有或不带有 UDF)来进行转换。

    【讨论】:

      猜你喜欢
      • 2021-09-12
      • 2020-12-26
      • 1970-01-01
      • 2019-06-01
      • 2021-04-07
      • 2013-04-11
      • 1970-01-01
      • 2018-02-22
      • 1970-01-01
      相关资源
      最近更新 更多