【问题标题】:How to convert kafka message value to a particular schema?如何将 kafka 消息值转换为特定模式?
【发布时间】:2021-09-12 14:40:53
【问题描述】:

我正在尝试使用 Pyspark 从 Kafka 主题中读取数据。我想将该数据转换为特定的模式。但做不到。

这是我尝试过的:

>> df = spark.read.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test1").load()
    
>> userSchema = StructType().add("Name", StringType(), True).add("Age", IntegerType(), True)

>> df1 = df.selectExpr("CAST(value AS STRING)")

>> df2 = df1.select(from_json(col("value"), userSchema))

>> df2.printSchema()
root
 |-- jsontostructs(value): struct (nullable = true)
 |    |-- Name: string (nullable = true)
 |    |-- Age: integer (nullable = true)

我想要的是:

>> df2.printSchema()
root
|-- Name: string (nullable = true)
|-- Age: integer (nullable = true)

有什么方法可以得到想要的架构?

【问题讨论】:

  • 你的输入数据/json长什么样子?

标签: json apache-spark pyspark spark-structured-streaming spark-kafka-integration


【解决方案1】:

对于面临同样问题的任何人,以下是我实现此目标的方法:

 df2 = df1.select(from_json(col("value"),userSchema)).select("jsontostructs(value).*")

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-12-01
    • 1970-01-01
    • 2020-02-07
    • 2020-12-15
    • 1970-01-01
    • 2018-09-05
    • 2021-11-29
    • 2017-10-31
    相关资源
    最近更新 更多